Compare commits

..

16 Commits

Author SHA1 Message Date
Stefan Agner
9126e3bb49 Fix pytest 2025-11-05 09:34:15 +01:00
Stefan Agner
0e506efcfb Exclude Core from architecture deprecation checks
This allows to download the latest available Core version still, even
on deprecated systems.
2025-11-04 17:50:08 +01:00
Stefan Agner
bd636defe8 Deprecate i386, armhf and armv7 Supervisor architectures 2025-11-04 17:24:12 +01:00
Stefan Agner
1448a33dbf Remove Codenotary integrity check (#6236)
* Formally deprecate CodeNotary build config

* Remove CodeNotary specific integrity checking

The current code is specific to how CodeNotary was doing integrity
checking. A future integrity checking mechanism likely will work
differently (e.g. through EROFS based containers). Remove the current
code to make way for a future implementation.

* Drop CodeNotary integrity fixups

* Drop unused tests

* Fix pytest

* Fix pytest

* Remove CodeNotary related exceptions and handling

Remove CodeNotary related exceptions and handling from the Docker
interface.

* Drop unnecessary comment

* Remove Codenotary specific IssueType/SuggestionType

* Drop Codenotary specific environment and secret reference

* Remove unused constants

* Introduce APIGone exception for removed APIs

Introduce a new exception class APIGone to indicate that certain API
features have been removed and are no longer available. Update the
security integrity check endpoint to raise this new exception instead
of a generic APIError, providing clearer communication to clients that
the feature has been intentionally removed.

* Drop content trust

A cosign based signature verification will likely be named differently
to avoid confusion with existing implementations. For now, remove the
content trust option entirely.

* Drop code sign test

* Remove source_mods/content_trust evaluations

* Remove content_trust reference in bootstrap.py

* Fix security tests

* Drop unused tests

* Drop codenotary from schema

Since we have "remove extra" in voluptuous, we can remove the
codenotary field from the addon schema.

* Remove content_trust from tests

* Remove content_trust unsupported reason

* Remove unnecessary comment

* Remove unrelated pytest

* Remove unrelated fixtures
2025-11-03 20:13:15 +01:00
Stefan Agner
1657769044 Fix parent job sync when parent reaches 100% progress (#6282)
* Fix parent job sync when parent reaches 100% progress

When a child job completes and syncs its progress to a parent job,
it can set the parent to 100% progress. However, there are situations
where a second child job for the same parent job is created after the
parent has already reached 100%. This caused a ValueError when creating
a ParentJobSync for subsequent child jobs, as the validator required
starting_progress < 100.0.

This issue was introduced in #6207 (shipped in 2025.10.0) but only
became visible in 2025.10.1 with #6195, which started using progress
syncs. The bug manifests during Core update rollbacks when a second
docker_interface_install job is created after the parent already
reached 100% from the first install.

Fix by skipping parent job sync setup when the parent is already done
or at 100% progress, as there's no value in syncing progress to a
completed parent.

* Update comment and add debug message
2025-11-03 17:58:40 +01:00
Copilot
a8b7923a42 Add test coverage for _map_nm_wifi method (#6275)
* Initial plan

* Add comprehensive test coverage for _map_nm_wifi method

Co-authored-by: agners <34061+agners@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: agners <34061+agners@users.noreply.github.com>
2025-11-03 10:04:01 +01:00
dependabot[bot]
b3b7bc29fa Bump ruff from 0.14.2 to 0.14.3 (#6280)
* Bump ruff from 0.14.2 to 0.14.3

Bumps [ruff](https://github.com/astral-sh/ruff) from 0.14.2 to 0.14.3.
- [Release notes](https://github.com/astral-sh/ruff/releases)
- [Changelog](https://github.com/astral-sh/ruff/blob/main/CHANGELOG.md)
- [Commits](https://github.com/astral-sh/ruff/compare/0.14.2...0.14.3)

---
updated-dependencies:
- dependency-name: ruff
  dependency-version: 0.14.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update precommit ruff too

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Mike Degatano <michael.degatano@gmail.com>
2025-10-31 11:42:39 -04:00
dependabot[bot]
2098168d04 Bump sentry-sdk from 2.42.1 to 2.43.0 (#6279)
Bumps [sentry-sdk](https://github.com/getsentry/sentry-python) from 2.42.1 to 2.43.0.
- [Release notes](https://github.com/getsentry/sentry-python/releases)
- [Changelog](https://github.com/getsentry/sentry-python/blob/master/CHANGELOG.md)
- [Commits](https://github.com/getsentry/sentry-python/compare/2.42.1...2.43.0)

---
updated-dependencies:
- dependency-name: sentry-sdk
  dependency-version: 2.43.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-30 14:24:50 -04:00
dependabot[bot]
02c4fd4a8c Bump aiohttp from 3.13.1 to 3.13.2 (#6273)
---
updated-dependencies:
- dependency-name: aiohttp
  dependency-version: 3.13.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-29 10:36:28 +01:00
Erwin Douna
0bee5c6f37 Add exponential backoff strategy to watchdog restart (#6262) 2025-10-28 17:46:40 +01:00
dependabot[bot]
9c0174f1fd Bump actions/upload-artifact from 4.6.2 to 5.0.0 (#6269)
Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 4.6.2 to 5.0.0.
- [Release notes](https://github.com/actions/upload-artifact/releases)
- [Commits](ea165f8d65...330a01c490)

---
updated-dependencies:
- dependency-name: actions/upload-artifact
  dependency-version: 5.0.0
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-27 12:28:43 +01:00
dependabot[bot]
dc3d8b9266 Bump actions/download-artifact from 5.0.0 to 6.0.0 (#6270)
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 5.0.0 to 6.0.0.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](634f93cb29...018cc2cf5b)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: 6.0.0
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-27 12:28:13 +01:00
dependabot[bot]
06d96db55b Bump orjson from 3.11.3 to 3.11.4 (#6271)
Bumps [orjson](https://github.com/ijl/orjson) from 3.11.3 to 3.11.4.
- [Release notes](https://github.com/ijl/orjson/releases)
- [Changelog](https://github.com/ijl/orjson/blob/master/CHANGELOG.md)
- [Commits](https://github.com/ijl/orjson/compare/3.11.3...3.11.4)

---
updated-dependencies:
- dependency-name: orjson
  dependency-version: 3.11.4
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-27 12:27:39 +01:00
Mike Degatano
131cc3b6d1 Prevent float drift error on progress update (#6267) 2025-10-24 09:37:19 -04:00
Michel Nederlof
b92f5976a3 If D-Bus to UDisks2 is not connected, return None (#6265)
When fetching the host info without UDisks2 D-Bus connected it would raise an exception and disable managing addons via home assistant (and other GUI functions that need host info status).
2025-10-24 10:22:02 +02:00
dependabot[bot]
370c961c9e Bump ruff from 0.14.1 to 0.14.2 (#6268)
Bumps [ruff](https://github.com/astral-sh/ruff) from 0.14.1 to 0.14.2.
- [Release notes](https://github.com/astral-sh/ruff/releases)
- [Changelog](https://github.com/astral-sh/ruff/blob/main/CHANGELOG.md)
- [Commits](https://github.com/astral-sh/ruff/compare/0.14.1...0.14.2)

---
updated-dependencies:
- dependency-name: ruff
  dependency-version: 0.14.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-10-24 09:46:04 +02:00
51 changed files with 397 additions and 1295 deletions

View File

@@ -170,8 +170,6 @@ jobs:
--target /data \
--cosign \
--generic ${{ needs.init.outputs.version }}
env:
CAS_API_KEY: ${{ secrets.CAS_TOKEN }}
version:
name: Update version
@@ -293,33 +291,6 @@ jobs:
exit 1
fi
- name: Check the Supervisor code sign
if: needs.init.outputs.publish == 'true'
run: |
echo "Enable Content-Trust"
test=$(docker exec hassio_cli ha security options --content-trust=true --no-progress --raw-json | jq -r '.result')
if [ "$test" != "ok" ]; then
exit 1
fi
echo "Run supervisor health check"
test=$(docker exec hassio_cli ha resolution healthcheck --no-progress --raw-json | jq -r '.result')
if [ "$test" != "ok" ]; then
exit 1
fi
echo "Check supervisor unhealthy"
test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unhealthy[]')
if [ "$test" != "" ]; then
exit 1
fi
echo "Check supervisor supported"
test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unsupported[]')
if [[ "$test" =~ source_mods ]]; then
exit 1
fi
- name: Create full backup
id: backup
run: |

View File

@@ -386,7 +386,7 @@ jobs:
-o console_output_style=count \
tests
- name: Upload coverage artifact
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
with:
name: coverage
path: .coverage
@@ -417,7 +417,7 @@ jobs:
echo "Failed to restore Python virtual environment from cache"
exit 1
- name: Download all coverage artifacts
uses: actions/download-artifact@634f93cb2916e3fdff6788551b99b062d0335ce0 # v5.0.0
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
with:
name: coverage
path: coverage/

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.10
rev: v0.14.3
hooks:
- id: ruff
args:

View File

@@ -1,5 +1,5 @@
aiodns==3.5.0
aiohttp==3.13.1
aiohttp==3.13.2
atomicwrites-homeassistant==1.4.1
attrs==25.4.0
awesomeversion==25.8.0
@@ -17,13 +17,13 @@ faust-cchardet==2.1.19
gitpython==3.1.45
jinja2==3.1.6
log-rate-limit==1.4.2
orjson==3.11.3
orjson==3.11.4
pulsectl==24.12.0
pyudev==0.24.4
PyYAML==6.0.3
requests==2.32.5
securetar==2025.2.1
sentry-sdk==2.42.1
sentry-sdk==2.43.0
setuptools==80.9.0
voluptuous==0.15.2
dbus-fast==2.44.5

View File

@@ -8,7 +8,7 @@ pytest-asyncio==0.25.2
pytest-cov==7.0.0
pytest-timeout==2.4.0
pytest==8.4.2
ruff==0.14.1
ruff==0.14.3
time-machine==2.19.0
types-docker==7.1.0.20251009
types-pyyaml==6.0.12.20250915

View File

@@ -1513,13 +1513,6 @@ class Addon(AddonModel):
_LOGGER.info("Finished restore for add-on %s", self.slug)
return wait_for_start
def check_trust(self) -> Awaitable[None]:
"""Calculate Addon docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
@Job(
name="addon_restart_after_problem",
throttle_period=WATCHDOG_THROTTLE_PERIOD,
@@ -1562,7 +1555,15 @@ class Addon(AddonModel):
)
break
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
# Exponential backoff to spread retries over the throttle window
delay = WATCHDOG_RETRY_SECONDS * (1 << max(attempts - 1, 0))
_LOGGER.debug(
"Watchdog will retry addon %s in %s seconds (attempt %s)",
self.name,
delay,
attempts + 1,
)
await asyncio.sleep(delay)
async def container_state_changed(self, event: DockerContainerStateEvent) -> None:
"""Set addon state from container state."""

View File

@@ -103,7 +103,6 @@ from .configuration import FolderMapping
from .const import (
ATTR_BACKUP,
ATTR_BREAKING_VERSIONS,
ATTR_CODENOTARY,
ATTR_PATH,
ATTR_READ_ONLY,
AddonBackupMode,
@@ -632,13 +631,8 @@ class AddonModel(JobGroup, ABC):
@property
def signed(self) -> bool:
"""Return True if the image is signed."""
return ATTR_CODENOTARY in self.data
@property
def codenotary(self) -> str | None:
"""Return Signer email address for CAS."""
return self.data.get(ATTR_CODENOTARY)
"""Currently no signing support."""
return False
@property
def breaking_versions(self) -> list[AwesomeVersion]:

View File

@@ -207,6 +207,12 @@ def _warn_addon_config(config: dict[str, Any]):
name,
)
if ATTR_CODENOTARY in config:
_LOGGER.warning(
"Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.",
name,
)
return config
@@ -417,7 +423,6 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce(
AddonBackupMode
),
vol.Optional(ATTR_CODENOTARY): vol.Email(),
vol.Optional(ATTR_OPTIONS, default={}): dict,
vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
vol.Schema({str: SCHEMA_ELEMENT}),

View File

@@ -253,16 +253,6 @@ class APIIngress(CoreSysAttributes):
skip_auto_headers={hdrs.CONTENT_TYPE},
) as result:
headers = _response_header(result)
# Empty body responses (304, 204, HEAD, etc.) should not be streamed,
# otherwise aiohttp < 3.9.0 may generate an invalid "0\r\n\r\n" chunk
# This also avoids setting content_type for empty responses.
if must_be_empty_body(request.method, result.status):
return web.Response(
headers=headers,
status=result.status,
)
# Avoid parsing content_type in simple cases for better performance
if maybe_content_type := result.headers.get(hdrs.CONTENT_TYPE):
content_type = (maybe_content_type.partition(";"))[0].strip()
@@ -270,7 +260,11 @@ class APIIngress(CoreSysAttributes):
content_type = result.content_type
# Simple request
if (
hdrs.CONTENT_LENGTH in result.headers
# empty body responses should not be streamed,
# otherwise aiohttp < 3.9.0 may generate
# an invalid "0\r\n\r\n" chunk instead of an empty response.
must_be_empty_body(request.method, result.status)
or hdrs.CONTENT_LENGTH in result.headers
and int(result.headers.get(hdrs.CONTENT_LENGTH, 0)) < 4_194_000
):
# Return Response

View File

@@ -1,24 +1,20 @@
"""Init file for Supervisor Security RESTful API."""
import asyncio
import logging
from typing import Any
from aiohttp import web
import attr
import voluptuous as vol
from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED
from supervisor.exceptions import APIGone
from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED
from ..coresys import CoreSysAttributes
from .utils import api_process, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
# pylint: disable=no-value-for-parameter
SCHEMA_OPTIONS = vol.Schema(
{
vol.Optional(ATTR_PWNED): vol.Boolean(),
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
}
)
@@ -31,7 +27,6 @@ class APISecurity(CoreSysAttributes):
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return Security information."""
return {
ATTR_CONTENT_TRUST: self.sys_security.content_trust,
ATTR_PWNED: self.sys_security.pwned,
ATTR_FORCE_SECURITY: self.sys_security.force,
}
@@ -43,8 +38,6 @@ class APISecurity(CoreSysAttributes):
if ATTR_PWNED in body:
self.sys_security.pwned = body[ATTR_PWNED]
if ATTR_CONTENT_TRUST in body:
self.sys_security.content_trust = body[ATTR_CONTENT_TRUST]
if ATTR_FORCE_SECURITY in body:
self.sys_security.force = body[ATTR_FORCE_SECURITY]
@@ -54,6 +47,9 @@ class APISecurity(CoreSysAttributes):
@api_process
async def integrity_check(self, request: web.Request) -> dict[str, Any]:
"""Run backend integrity check."""
result = await asyncio.shield(self.sys_security.integrity_check())
return attr.asdict(result)
"""Run backend integrity check.
CodeNotary integrity checking has been removed. This endpoint now returns
an error indicating the feature is gone.
"""
raise APIGone("Integrity check feature has been removed.")

View File

@@ -16,14 +16,12 @@ from ..const import (
ATTR_BLK_READ,
ATTR_BLK_WRITE,
ATTR_CHANNEL,
ATTR_CONTENT_TRUST,
ATTR_COUNTRY,
ATTR_CPU_PERCENT,
ATTR_DEBUG,
ATTR_DEBUG_BLOCK,
ATTR_DETECT_BLOCKING_IO,
ATTR_DIAGNOSTICS,
ATTR_FORCE_SECURITY,
ATTR_HEALTHY,
ATTR_ICON,
ATTR_IP_ADDRESS,
@@ -69,8 +67,6 @@ SCHEMA_OPTIONS = vol.Schema(
vol.Optional(ATTR_DEBUG): vol.Boolean(),
vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(),
vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(),
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
vol.Optional(ATTR_DETECT_BLOCKING_IO): vol.Coerce(DetectBlockingIO),
vol.Optional(ATTR_COUNTRY): str,

View File

@@ -105,7 +105,6 @@ async def initialize_coresys() -> CoreSys:
if coresys.dev:
coresys.updater.channel = UpdateChannel.DEV
coresys.security.content_trust = False
# Convert datetime
logging.Formatter.converter = lambda *args: coresys.now().timetuple()

View File

@@ -846,16 +846,6 @@ class DockerAddon(DockerInterface):
):
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
if not self.addon.signed:
return
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_content(
cast(str, self.addon.codenotary), checksum
)
@Job(
name="docker_addon_hardware_events",
conditions=[JobCondition.OS_AGENT],

View File

@@ -5,7 +5,7 @@ from ipaddress import IPv4Address
import logging
import re
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from awesomeversion import AwesomeVersion
from docker.types import Mount
from ..const import LABEL_MACHINE
@@ -244,13 +244,3 @@ class DockerHomeAssistant(DockerInterface):
self.image,
self.sys_homeassistant.version,
)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
try:
if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST:
return
except AwesomeVersionCompareException:
return
await super()._validate_trust(image_id)

View File

@@ -31,15 +31,12 @@ from ..const import (
)
from ..coresys import CoreSys
from ..exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
DockerAPIError,
DockerError,
DockerJobError,
DockerLogOutOfOrder,
DockerNotFound,
DockerRequestError,
DockerTrustError,
)
from ..jobs import SupervisorJob
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
@@ -306,6 +303,8 @@ class DockerInterface(JobGroup, ABC):
# Our filters have all passed. Time to update the job
# Only downloading and extracting have progress details. Use that to set extra
# We'll leave it around on later stages as the total bytes may be useful after that stage
# Enforce range to prevent float drift error
progress = max(0, min(progress, 100))
if (
stage in {PullImageLayerStage.DOWNLOADING, PullImageLayerStage.EXTRACTING}
and reference.progress_detail
@@ -371,7 +370,7 @@ class DockerInterface(JobGroup, ABC):
# To reduce noise, limit updates to when result has changed by an entire percent or when stage changed
if stage != install_job.stage or progress >= install_job.progress + 1:
install_job.update(stage=stage.status, progress=progress)
install_job.update(stage=stage.status, progress=max(0, min(progress, 100)))
@Job(
name="docker_interface_install",
@@ -423,18 +422,6 @@ class DockerInterface(JobGroup, ABC):
platform=MAP_ARCH[image_arch],
)
# Validate content
try:
await self._validate_trust(cast(str, docker_image.id))
except CodeNotaryError:
with suppress(docker.errors.DockerException):
await self.sys_run_in_executor(
self.sys_docker.images.remove,
image=f"{image}:{version!s}",
force=True,
)
raise
# Tag latest
if latest:
_LOGGER.info(
@@ -460,16 +447,6 @@ class DockerInterface(JobGroup, ABC):
raise DockerError(
f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error
) from err
except CodeNotaryUntrusted as err:
raise DockerTrustError(
f"Pulled image {image}:{version!s} failed on content-trust verification!",
_LOGGER.critical,
) from err
except CodeNotaryError as err:
raise DockerTrustError(
f"Error happened on Content-Trust check for {image}:{version!s}: {err!s}",
_LOGGER.error,
) from err
finally:
if listener:
self.sys_bus.remove_listener(listener)
@@ -807,24 +784,3 @@ class DockerInterface(JobGroup, ABC):
return self.sys_run_in_executor(
self.sys_docker.container_run_inside, self.name, command
)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_own_content(checksum)
@Job(
name="docker_interface_check_trust",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_trust(self) -> None:
"""Check trust of exists Docker image."""
try:
image = await self.sys_run_in_executor(
self.sys_docker.images.get, f"{self.image}:{self.version!s}"
)
except (docker.errors.DockerException, requests.RequestException):
return
await self._validate_trust(cast(str, image.id))

View File

@@ -423,6 +423,12 @@ class APINotFound(APIError):
status = 404
class APIGone(APIError):
"""API is no longer available."""
status = 410
class APIAddonNotInstalled(APIError):
"""Not installed addon requested at addons API."""
@@ -577,21 +583,6 @@ class PwnedConnectivityError(PwnedError):
"""Connectivity errors while checking pwned passwords."""
# util/codenotary
class CodeNotaryError(HassioError):
"""Error general with CodeNotary."""
class CodeNotaryUntrusted(CodeNotaryError):
"""Error on untrusted content."""
class CodeNotaryBackendError(CodeNotaryError):
"""CodeNotary backend error happening."""
# util/whoami

View File

@@ -9,7 +9,12 @@ from typing import Any
from supervisor.resolution.const import UnhealthyReason
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import DBusError, DBusObjectError, HardwareNotFound
from ..exceptions import (
DBusError,
DBusNotConnectedError,
DBusObjectError,
HardwareNotFound,
)
from .const import UdevSubsystem
from .data import Device
@@ -207,6 +212,8 @@ class HwDisk(CoreSysAttributes):
try:
block_device = self.sys_dbus.udisks2.get_block_device_by_path(device_path)
drive = self.sys_dbus.udisks2.get_drive(block_device.drive)
except DBusNotConnectedError:
return None
except DBusObjectError:
_LOGGER.warning(
"Unable to find UDisks2 drive for device at %s", device_path.as_posix()

View File

@@ -428,13 +428,6 @@ class HomeAssistantCore(JobGroup):
"""
return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate HomeAssistant docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats:
"""Return stats of Home Assistant."""
try:

View File

@@ -327,6 +327,17 @@ class JobManager(FileConfiguration, CoreSysAttributes):
if not curr_parent.child_job_syncs:
continue
# HACK: If parent trigger the same child job, we just skip this second
# sync. Maybe it would be better to have this reflected in the job stage
# and reset progress to 0 instead? There is no support for such stage
# information on Core update entities today though.
if curr_parent.done is True or curr_parent.progress >= 100:
_LOGGER.debug(
"Skipping parent job sync for done parent job %s",
curr_parent.name,
)
continue
# Break after first match at each parent as it doesn't make sense
# to match twice. But it could match multiple parents
for sync in curr_parent.child_job_syncs:

View File

@@ -34,6 +34,7 @@ class JobCondition(StrEnum):
PLUGINS_UPDATED = "plugins_updated"
RUNNING = "running"
SUPERVISOR_UPDATED = "supervisor_updated"
ARCHITECTURE_SUPPORTED = "architecture_supported"
class JobConcurrency(StrEnum):

View File

@@ -441,6 +441,14 @@ class Job(CoreSysAttributes):
raise JobConditionException(
f"'{method_name}' blocked from execution, supervisor needs to be updated first"
)
if (
JobCondition.ARCHITECTURE_SUPPORTED in used_conditions
and UnsupportedReason.SYSTEM_ARCHITECTURE
in coresys.sys_resolution.unsupported
):
raise JobConditionException(
f"'{method_name}' blocked from execution, unsupported system architecture"
)
if JobCondition.PLUGINS_UPDATED in used_conditions and (
out_of_date := [

View File

@@ -161,6 +161,7 @@ class Tasks(CoreSysAttributes):
JobCondition.INTERNET_HOST,
JobCondition.OS_SUPPORTED,
JobCondition.RUNNING,
JobCondition.ARCHITECTURE_SUPPORTED,
],
concurrency=JobConcurrency.REJECT,
)

View File

@@ -76,13 +76,6 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
"""Return True if a task is in progress."""
return self.instance.in_progress
def check_trust(self) -> Awaitable[None]:
"""Calculate plugin docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
def logs(self) -> Awaitable[bytes]:
"""Get docker plugin logs.

View File

@@ -23,4 +23,5 @@ PLUGIN_UPDATE_CONDITIONS = [
JobCondition.HEALTHY,
JobCondition.INTERNET_HOST,
JobCondition.SUPERVISOR_UPDATED,
JobCondition.ARCHITECTURE_SUPPORTED,
]

View File

@@ -1,59 +0,0 @@
"""Helpers to check supervisor trust."""
import logging
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckSupervisorTrust(coresys)
class CheckSupervisorTrust(CheckBase):
"""CheckSystemTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
try:
await self.sys_supervisor.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED)
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
pass
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
try:
await self.sys_supervisor.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SUPERVISOR
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@@ -39,7 +39,6 @@ class UnsupportedReason(StrEnum):
APPARMOR = "apparmor"
CGROUP_VERSION = "cgroup_version"
CONNECTIVITY_CHECK = "connectivity_check"
CONTENT_TRUST = "content_trust"
DBUS = "dbus"
DNS_SERVER = "dns_server"
DOCKER_CONFIGURATION = "docker_configuration"
@@ -54,12 +53,12 @@ class UnsupportedReason(StrEnum):
PRIVILEGED = "privileged"
RESTART_POLICY = "restart_policy"
SOFTWARE = "software"
SOURCE_MODS = "source_mods"
SUPERVISOR_VERSION = "supervisor_version"
SYSTEMD = "systemd"
SYSTEMD_JOURNAL = "systemd_journal"
SYSTEMD_RESOLVED = "systemd_resolved"
VIRTUALIZATION_IMAGE = "virtualization_image"
SYSTEM_ARCHITECTURE = "system_architecture"
class UnhealthyReason(StrEnum):
@@ -103,7 +102,6 @@ class IssueType(StrEnum):
PWNED = "pwned"
REBOOT_REQUIRED = "reboot_required"
SECURITY = "security"
TRUST = "trust"
UPDATE_FAILED = "update_failed"
UPDATE_ROLLBACK = "update_rollback"
@@ -115,7 +113,6 @@ class SuggestionType(StrEnum):
CLEAR_FULL_BACKUP = "clear_full_backup"
CREATE_FULL_BACKUP = "create_full_backup"
DISABLE_BOOT = "disable_boot"
EXECUTE_INTEGRITY = "execute_integrity"
EXECUTE_REBOOT = "execute_reboot"
EXECUTE_REBUILD = "execute_rebuild"
EXECUTE_RELOAD = "execute_reload"

View File

@@ -1,72 +0,0 @@
"""Evaluation class for Content Trust."""
import errno
import logging
from pathlib import Path
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ...utils.codenotary import calc_checksum_path_sourcecode
from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason
from .base import EvaluateBase
_SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor")
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> EvaluateBase:
"""Initialize evaluation-setup function."""
return EvaluateSourceMods(coresys)
class EvaluateSourceMods(EvaluateBase):
"""Evaluate supervisor source modifications."""
@property
def reason(self) -> UnsupportedReason:
"""Return a UnsupportedReason enum."""
return UnsupportedReason.SOURCE_MODS
@property
def on_failure(self) -> str:
"""Return a string that is printed when self.evaluate is True."""
return "System detect unauthorized source code modifications."
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.RUNNING]
async def evaluate(self) -> bool:
"""Run evaluation."""
if not self.sys_security.content_trust:
_LOGGER.warning("Disabled content-trust, skipping evaluation")
return False
# Calculate sume of the sourcecode
try:
checksum = await self.sys_run_in_executor(
calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE
)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.add_unhealthy_reason(
UnhealthyReason.OSERROR_BAD_MESSAGE
)
self.sys_resolution.create_issue(
IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM
)
_LOGGER.error("Can't calculate checksum of source code: %s", err)
return False
# Validate checksum
try:
await self.sys_security.verify_own_content(checksum)
except CodeNotaryUntrusted:
return True
except CodeNotaryError:
pass
return False

View File

@@ -1,4 +1,4 @@
"""Evaluation class for Content Trust."""
"""Evaluation class for system architecture support."""
from ...const import CoreState
from ...coresys import CoreSys
@@ -8,27 +8,31 @@ from .base import EvaluateBase
def setup(coresys: CoreSys) -> EvaluateBase:
"""Initialize evaluation-setup function."""
return EvaluateContentTrust(coresys)
return EvaluateSystemArchitecture(coresys)
class EvaluateContentTrust(EvaluateBase):
"""Evaluate system content trust level."""
class EvaluateSystemArchitecture(EvaluateBase):
"""Evaluate if the current Supervisor architecture is supported."""
@property
def reason(self) -> UnsupportedReason:
"""Return a UnsupportedReason enum."""
return UnsupportedReason.CONTENT_TRUST
return UnsupportedReason.SYSTEM_ARCHITECTURE
@property
def on_failure(self) -> str:
"""Return a string that is printed when self.evaluate is True."""
return "System run with disabled trusted content security."
return "System architecture is no longer supported. Move to a supported system architecture."
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]
return [CoreState.INITIALIZE]
async def evaluate(self) -> bool:
async def evaluate(self):
"""Run evaluation."""
return not self.sys_security.content_trust
return self.sys_host.info.sys_arch.supervisor in {
"i386",
"armhf",
"armv7",
}

View File

@@ -1,67 +0,0 @@
"""Helpers to check and fix issues with free space."""
from datetime import timedelta
import logging
from ...coresys import CoreSys
from ...exceptions import ResolutionFixupError, ResolutionFixupJobError
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ...security.const import ContentTrustResult
from ..const import ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemExecuteIntegrity(coresys)
class FixupSystemExecuteIntegrity(FixupBase):
"""Storage class for fixup."""
@Job(
name="fixup_system_execute_integrity_process",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=ResolutionFixupJobError,
throttle_period=timedelta(hours=8),
throttle=JobThrottle.THROTTLE,
)
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
result = await self.sys_security.integrity_check()
if ContentTrustResult.FAILED in (result.core, result.supervisor):
raise ResolutionFixupError()
for plugin in result.plugins:
if plugin != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
for addon in result.addons:
if addon != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
@property
def suggestion(self) -> SuggestionType:
"""Return a SuggestionType enum."""
return SuggestionType.EXECUTE_INTEGRITY
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.TRUST]
@property
def auto(self) -> bool:
"""Return if a fixup can be apply as auto fix."""
return True

View File

@@ -1,24 +0,0 @@
"""Security constants."""
from enum import StrEnum
import attr
class ContentTrustResult(StrEnum):
"""Content trust result enum."""
PASS = "pass"
ERROR = "error"
FAILED = "failed"
UNTESTED = "untested"
@attr.s
class IntegrityResult:
"""Result of a full integrity check."""
supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
plugins: dict[str, ContentTrustResult] = attr.ib(default={})
addons: dict[str, ContentTrustResult] = attr.ib(default={})

View File

@@ -4,27 +4,12 @@ from __future__ import annotations
import logging
from ..const import (
ATTR_CONTENT_TRUST,
ATTR_FORCE_SECURITY,
ATTR_PWNED,
FILE_HASSIO_SECURITY,
)
from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED, FILE_HASSIO_SECURITY
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
PwnedError,
SecurityJobError,
)
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.codenotary import cas_validate
from ..exceptions import PwnedError
from ..utils.common import FileConfiguration
from ..utils.pwned import check_pwned_password
from ..validate import SCHEMA_SECURITY_CONFIG
from .const import ContentTrustResult, IntegrityResult
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -37,16 +22,6 @@ class Security(FileConfiguration, CoreSysAttributes):
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
self.coresys = coresys
@property
def content_trust(self) -> bool:
"""Return if content trust is enabled/disabled."""
return self._data[ATTR_CONTENT_TRUST]
@content_trust.setter
def content_trust(self, value: bool) -> None:
"""Set content trust is enabled/disabled."""
self._data[ATTR_CONTENT_TRUST] = value
@property
def force(self) -> bool:
"""Return if force security is enabled/disabled."""
@@ -67,30 +42,6 @@ class Security(FileConfiguration, CoreSysAttributes):
"""Set pwned is enabled/disabled."""
self._data[ATTR_PWNED] = value
async def verify_content(self, signer: str, checksum: str) -> None:
"""Verify content on CAS."""
if not self.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await cas_validate(signer, checksum)
except CodeNotaryUntrusted:
raise
except CodeNotaryError:
if self.force:
raise
self.sys_resolution.create_issue(
IssueType.TRUST,
ContextType.SYSTEM,
suggestions=[SuggestionType.EXECUTE_INTEGRITY],
)
return
async def verify_own_content(self, checksum: str) -> None:
"""Verify content from HA org."""
return await self.verify_content("notary@home-assistant.io", checksum)
async def verify_secret(self, pwned_hash: str) -> None:
"""Verify pwned state of a secret."""
if not self.pwned:
@@ -103,73 +54,3 @@ class Security(FileConfiguration, CoreSysAttributes):
if self.force:
raise
return
@Job(
name="security_manager_integrity_check",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=SecurityJobError,
concurrency=JobConcurrency.REJECT,
)
async def integrity_check(self) -> IntegrityResult:
"""Run a full system integrity check of the platform.
We only allow to install trusted content.
This is a out of the band manual check.
"""
result: IntegrityResult = IntegrityResult()
if not self.content_trust:
_LOGGER.warning(
"Skipping integrity check, content_trust is globally disabled"
)
return result
# Supervisor
try:
await self.sys_supervisor.check_trust()
result.supervisor = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.supervisor = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
result.supervisor = ContentTrustResult.FAILED
# Core
try:
await self.sys_homeassistant.core.check_trust()
result.core = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.core = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
except CodeNotaryError:
result.core = ContentTrustResult.FAILED
# Plugins
for plugin in self.sys_plugins.all_plugins:
try:
await plugin.check_trust()
result.plugins[plugin.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.plugins[plugin.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
)
except CodeNotaryError:
result.plugins[plugin.slug] = ContentTrustResult.FAILED
# Add-ons
for addon in self.sys_addons.installed:
if not addon.signed:
result.addons[addon.slug] = ContentTrustResult.UNTESTED
continue
try:
await addon.check_trust()
result.addons[addon.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.addons[addon.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.ADDON, reference=addon.slug
)
except CodeNotaryError:
result.addons[addon.slug] = ContentTrustResult.FAILED
return result

View File

@@ -25,8 +25,6 @@ from .coresys import CoreSys, CoreSysAttributes
from .docker.stats import DockerStats
from .docker.supervisor import DockerSupervisor
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
DockerError,
HostAppArmorError,
SupervisorAppArmorError,
@@ -37,7 +35,6 @@ from .exceptions import (
from .jobs.const import JobCondition, JobThrottle
from .jobs.decorator import Job
from .resolution.const import ContextType, IssueType, UnhealthyReason
from .utils.codenotary import calc_checksum
from .utils.sentry import async_capture_exception
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -150,20 +147,6 @@ class Supervisor(CoreSysAttributes):
_LOGGER.error,
) from err
# Validate
try:
await self.sys_security.verify_own_content(calc_checksum(data))
except CodeNotaryUntrusted as err:
raise SupervisorAppArmorError(
"Content-Trust is broken for the AppArmor profile fetch!",
_LOGGER.critical,
) from err
except CodeNotaryError as err:
raise SupervisorAppArmorError(
f"CodeNotary error while processing AppArmor fetch: {err!s}",
_LOGGER.error,
) from err
# Load
temp_dir: TemporaryDirectory | None = None
@@ -273,13 +256,6 @@ class Supervisor(CoreSysAttributes):
"""
return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate Supervisor docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats:
"""Return stats of Supervisor."""
try:

View File

@@ -31,14 +31,8 @@ from .const import (
UpdateChannel,
)
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
UpdaterError,
UpdaterJobError,
)
from .exceptions import UpdaterError, UpdaterJobError
from .jobs.decorator import Job, JobCondition
from .utils.codenotary import calc_checksum
from .utils.common import FileConfiguration
from .validate import SCHEMA_UPDATER_CONFIG
@@ -248,9 +242,10 @@ class Updater(FileConfiguration, CoreSysAttributes):
@Job(
name="updater_fetch_data",
conditions=[
JobCondition.ARCHITECTURE_SUPPORTED,
JobCondition.INTERNET_SYSTEM,
JobCondition.OS_SUPPORTED,
JobCondition.HOME_ASSISTANT_CORE_SUPPORTED,
JobCondition.OS_SUPPORTED,
],
on_condition=UpdaterJobError,
throttle_period=timedelta(seconds=30),
@@ -289,19 +284,6 @@ class Updater(FileConfiguration, CoreSysAttributes):
self.sys_bus.remove_listener(self._connectivity_listener)
self._connectivity_listener = None
# Validate
try:
await self.sys_security.verify_own_content(calc_checksum(data))
except CodeNotaryUntrusted as err:
raise UpdaterError(
"Content-Trust is broken for the version file fetch!", _LOGGER.critical
) from err
except CodeNotaryError as err:
raise UpdaterError(
f"CodeNotary error while processing version fetch: {err!s}",
_LOGGER.error,
) from err
# Parse data
try:
data = json.loads(data)

View File

@@ -1,109 +0,0 @@
"""Small wrapper for CodeNotary."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
from pathlib import Path
import shlex
from typing import Final
from dirhash import dirhash
from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted
from . import clean_env
_LOGGER: logging.Logger = logging.getLogger(__name__)
_CAS_CMD: str = (
"cas authenticate --signerID {signer} --silent --output json --hash {sum}"
)
_CACHE: set[tuple[str, str]] = set()
_ATTR_ERROR: Final = "error"
_ATTR_STATUS: Final = "status"
_FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue"
def calc_checksum(data: str | bytes) -> str:
"""Generate checksum for CodeNotary."""
if isinstance(data, str):
return hashlib.sha256(data.encode()).hexdigest()
return hashlib.sha256(data).hexdigest()
def calc_checksum_path_sourcecode(folder: Path) -> str:
"""Calculate checksum for a path source code.
Need catch OSError.
"""
return dirhash(folder.as_posix(), "sha256", match=["*.py"])
# pylint: disable=unreachable
async def cas_validate(
signer: str,
checksum: str,
) -> None:
"""Validate data against CodeNotary."""
return
if (checksum, signer) in _CACHE:
return
# Generate command for request
command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum))
# Request notary authorization
_LOGGER.debug("Send cas command: %s", command)
try:
proc = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=clean_env(),
)
async with asyncio.timeout(15):
data, error = await proc.communicate()
except TimeoutError:
raise CodeNotaryBackendError(
"Timeout while processing CodeNotary", _LOGGER.warning
) from None
except OSError as err:
raise CodeNotaryError(
f"CodeNotary fatal error: {err!s}", _LOGGER.critical
) from err
# Check if Notarized
if proc.returncode != 0 and not data:
if error:
try:
error = error.decode("utf-8")
except UnicodeDecodeError as err:
raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err
if "not notarized" in error:
raise CodeNotaryUntrusted()
else:
error = _FALLBACK_ERROR
raise CodeNotaryBackendError(error, _LOGGER.warning)
# Parse data
try:
data_json = json.loads(data)
_LOGGER.debug("CodeNotary response with: %s", data_json)
except (json.JSONDecodeError, UnicodeDecodeError) as err:
raise CodeNotaryError(
f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error
) from err
if _ATTR_ERROR in data_json:
raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning)
if data_json[_ATTR_STATUS] == 0:
_CACHE.add((checksum, signer))
else:
raise CodeNotaryUntrusted()

View File

@@ -12,7 +12,6 @@ from .const import (
ATTR_AUTO_UPDATE,
ATTR_CHANNEL,
ATTR_CLI,
ATTR_CONTENT_TRUST,
ATTR_COUNTRY,
ATTR_DEBUG,
ATTR_DEBUG_BLOCK,
@@ -229,7 +228,6 @@ SCHEMA_INGRESS_CONFIG = vol.Schema(
# pylint: disable=no-value-for-parameter
SCHEMA_SECURITY_CONFIG = vol.Schema(
{
vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(),
vol.Optional(ATTR_PWNED, default=True): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(),
},

View File

@@ -17,16 +17,6 @@ async def test_api_security_options_force_security(api_client, coresys: CoreSys)
assert coresys.security.force
@pytest.mark.asyncio
async def test_api_security_options_content_trust(api_client, coresys: CoreSys):
"""Test security options content trust."""
assert coresys.security.content_trust
await api_client.post("/security/options", json={"content_trust": False})
assert not coresys.security.content_trust
@pytest.mark.asyncio
async def test_api_security_options_pwned(api_client, coresys: CoreSys):
"""Test security options pwned."""
@@ -41,11 +31,8 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys):
async def test_api_integrity_check(
api_client, coresys: CoreSys, supervisor_internet: AsyncMock
):
"""Test security integrity check."""
coresys.security.content_trust = False
"""Test security integrity check - now deprecated."""
resp = await api_client.post("/security/integrity")
result = await resp.json()
assert result["data"]["core"] == "untested"
assert result["data"]["supervisor"] == "untested"
# CodeNotary integrity check has been removed, should return 410 Gone
assert resp.status == 410

View File

@@ -31,15 +31,6 @@ from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from tests.common import load_json_fixture
@pytest.fixture(autouse=True)
def mock_verify_content(coresys: CoreSys):
"""Mock verify_content utility during tests."""
with patch.object(
coresys.security, "verify_content", return_value=None
) as verify_content:
yield verify_content
@pytest.mark.parametrize(
"cpu_arch, platform",
[

View File

@@ -376,3 +376,14 @@ async def test_try_get_nvme_life_time_missing_percent_used(
coresys.config.path_supervisor
)
assert lifetime is None
async def test_try_get_nvme_life_time_dbus_not_connected(coresys: CoreSys):
"""Test getting lifetime info from an NVMe when DBUS is not connected."""
# Set the dbus for udisks2 bus to be None, to make it forcibly disconnected.
coresys.dbus.udisks2.dbus = None
lifetime = await coresys.hardware.disk.get_disk_life_time(
coresys.config.path_supervisor
)
assert lifetime is None

View File

@@ -7,8 +7,8 @@ import pytest
from supervisor.coresys import CoreSys
from supervisor.dbus.const import DeviceType
from supervisor.host.configuration import Interface, VlanConfig
from supervisor.host.const import InterfaceType
from supervisor.host.configuration import Interface, VlanConfig, WifiConfig
from supervisor.host.const import AuthMethod, InterfaceType, WifiMode
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.network_connection_settings import (
@@ -291,3 +291,237 @@ async def test_equals_dbus_interface_eth0_10_real(
# Test should pass with matching VLAN config
assert test_vlan_interface.equals_dbus_interface(network_interface) is True
def test_map_nm_wifi_non_wireless_interface():
"""Test _map_nm_wifi returns None for non-wireless interface."""
# Mock non-wireless interface
mock_interface = Mock()
mock_interface.type = DeviceType.ETHERNET
mock_interface.settings = Mock()
result = Interface._map_nm_wifi(mock_interface)
assert result is None
def test_map_nm_wifi_no_settings():
"""Test _map_nm_wifi returns None when interface has no settings."""
# Mock wireless interface without settings
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = None
result = Interface._map_nm_wifi(mock_interface)
assert result is None
def test_map_nm_wifi_open_authentication():
"""Test _map_nm_wifi with open authentication (no security)."""
# Mock wireless interface with open authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert isinstance(result, WifiConfig)
assert result.mode == WifiMode.INFRASTRUCTURE
assert result.ssid == "TestSSID"
assert result.auth == AuthMethod.OPEN
assert result.psk is None
assert result.signal is None
def test_map_nm_wifi_wep_authentication():
"""Test _map_nm_wifi with WEP authentication."""
# Mock wireless interface with WEP authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = Mock()
mock_interface.settings.wireless_security.key_mgmt = "none"
mock_interface.settings.wireless_security.psk = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "WEPNetwork"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert isinstance(result, WifiConfig)
assert result.auth == AuthMethod.WEP
assert result.ssid == "WEPNetwork"
assert result.psk is None
def test_map_nm_wifi_wpa_psk_authentication():
"""Test _map_nm_wifi with WPA-PSK authentication."""
# Mock wireless interface with WPA-PSK authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = Mock()
mock_interface.settings.wireless_security.key_mgmt = "wpa-psk"
mock_interface.settings.wireless_security.psk = "SecretPassword123"
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "SecureNetwork"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert isinstance(result, WifiConfig)
assert result.auth == AuthMethod.WPA_PSK
assert result.ssid == "SecureNetwork"
assert result.psk == "SecretPassword123"
def test_map_nm_wifi_unsupported_authentication():
"""Test _map_nm_wifi returns None for unsupported authentication method."""
# Mock wireless interface with unsupported authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = Mock()
mock_interface.settings.wireless_security.key_mgmt = "wpa-eap" # Unsupported
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "EnterpriseNetwork"
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is None
def test_map_nm_wifi_different_modes():
"""Test _map_nm_wifi with different wifi modes."""
modes_to_test = [
("infrastructure", WifiMode.INFRASTRUCTURE),
("mesh", WifiMode.MESH),
("adhoc", WifiMode.ADHOC),
("ap", WifiMode.AP),
]
for mode_value, expected_mode in modes_to_test:
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = mode_value
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.mode == expected_mode
def test_map_nm_wifi_with_signal():
"""Test _map_nm_wifi with wireless signal strength."""
# Mock wireless interface with active connection and signal
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = Mock()
mock_interface.wireless.active = Mock()
mock_interface.wireless.active.strength = 75
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.signal == 75
def test_map_nm_wifi_without_signal():
"""Test _map_nm_wifi without wireless signal (no active connection)."""
# Mock wireless interface without active connection
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.signal is None
def test_map_nm_wifi_wireless_no_active_ap():
"""Test _map_nm_wifi with wireless object but no active access point."""
# Mock wireless interface with wireless object but no active AP
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = Mock()
mock_interface.wireless.active = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.signal is None
def test_map_nm_wifi_no_wireless_settings():
"""Test _map_nm_wifi when wireless settings are missing."""
# Mock wireless interface without wireless settings
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = None
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.ssid == ""
assert result.mode == WifiMode.INFRASTRUCTURE # Default mode
def test_map_nm_wifi_no_wireless_mode():
"""Test _map_nm_wifi when wireless mode is not specified."""
# Mock wireless interface without mode specified
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = None
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.mode == WifiMode.INFRASTRUCTURE # Default mode

View File

@@ -181,7 +181,6 @@ async def test_reload_updater_triggers_supervisor_update(
"""Test an updater reload triggers a supervisor update if there is one."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False
with (
patch.object(

View File

@@ -17,7 +17,6 @@ from supervisor.exceptions import (
AudioJobError,
CliError,
CliJobError,
CodeNotaryUntrusted,
CoreDNSError,
CoreDNSJobError,
DockerError,
@@ -337,14 +336,12 @@ async def test_repair_failed(
patch.object(
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
),
patch(
"supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted
),
patch.object(DockerInterface, "install", side_effect=DockerError),
):
await plugin.repair()
capture_exception.assert_called_once()
assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted)
assert check_exception_chain(capture_exception.call_args[0][0], DockerError)
@pytest.mark.parametrize(

View File

@@ -51,7 +51,6 @@ async def test_if_check_make_issue(coresys: CoreSys):
"""Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
await coresys.resolution.check.check_system()
@@ -63,7 +62,6 @@ async def test_if_check_cleanup_issue(coresys: CoreSys):
"""Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
await coresys.resolution.check.check_system()

View File

@@ -1,96 +0,0 @@
"""Test Check Supervisor trust."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
supervisor_trust = CheckSupervisorTrust(coresys)
assert supervisor_trust.slug == "supervisor_trust"
assert supervisor_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
coresys.supervisor.check_trust = AsyncMock(return_value=None)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await supervisor_trust.approve_check()
coresys.supervisor.check_trust = AsyncMock(return_value=None)
assert not await supervisor_trust.approve_check()
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert not coresys.security.verify_own_content.called
assert (
"Skipping supervisor_trust, content_trust is globally disabled" in caplog.text
)
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
supervisor_trust = CheckSupervisorTrust(coresys)
should_run = supervisor_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check",
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
await supervisor_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await supervisor_trust()
check.assert_not_called()
check.reset_mock()

View File

@@ -1,46 +0,0 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
from unittest.mock import patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
job_conditions = EvaluateContentTrust(coresys)
await coresys.core.set_state(CoreState.SETUP)
await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported
coresys.security.content_trust = False
await job_conditions()
assert job_conditions.reason in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys):
"""Test that the evaluation ran as expected."""
job_conditions = EvaluateContentTrust(coresys)
should_run = job_conditions.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.evaluations.content_trust.EvaluateContentTrust.evaluate",
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
await job_conditions()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await job_conditions()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -1,89 +0,0 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
import errno
import os
from pathlib import Path
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.const import ContextType, IssueType
from supervisor.resolution.data import Issue
from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
with patch(
"supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE",
Path(f"{os.getcwd()}/supervisor"),
):
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await sourcemods()
assert sourcemods.reason in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError)
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock()
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys):
"""Test that the evaluation ran as expected."""
sourcemods = EvaluateSourceMods(coresys)
should_run = sourcemods.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.evaluate",
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
await sourcemods()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await sourcemods()
evaluate.assert_not_called()
evaluate.reset_mock()
async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation."""
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM)
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs not in coresys.resolution.issues
with patch(
"supervisor.utils.codenotary.dirhash",
side_effect=(err := OSError()),
):
err.errno = errno.EBUSY
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs in coresys.resolution.issues
assert coresys.core.healthy is True
coresys.resolution.dismiss_issue(corrupt_fs)
err.errno = errno.EBADMSG
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs in coresys.resolution.issues
assert coresys.core.healthy is False

View File

@@ -0,0 +1,43 @@
"""Test evaluation supported system architectures."""
from unittest.mock import PropertyMock, patch
import pytest
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.evaluations.system_architecture import (
EvaluateSystemArchitecture,
)
@pytest.mark.parametrize("arch", ["i386", "armhf", "armv7"])
async def test_evaluation_unsupported_architectures(
coresys: CoreSys,
arch: str,
):
"""Test evaluation of unsupported system architectures."""
system_architecture = EvaluateSystemArchitecture(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
with patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value=arch)
):
await system_architecture()
assert system_architecture.reason in coresys.resolution.unsupported
@pytest.mark.parametrize("arch", ["amd64", "aarch64"])
async def test_evaluation_supported_architectures(
coresys: CoreSys,
arch: str,
):
"""Test evaluation of supported system architectures."""
system_architecture = EvaluateSystemArchitecture(coresys)
await coresys.core.set_state(CoreState.INITIALIZE)
with patch.object(
type(coresys.supervisor), "arch", PropertyMock(return_value=arch)
):
await system_architecture()
assert system_architecture.reason not in coresys.resolution.unsupported

View File

@@ -1,69 +0,0 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
from datetime import timedelta
from unittest.mock import AsyncMock
import time_machine
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from supervisor.resolution.fixups.system_execute_integrity import (
FixupSystemExecuteIntegrity,
)
from supervisor.security.const import ContentTrustResult, IntegrityResult
from supervisor.utils.dt import utcnow
async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.add_suggestion(
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
)
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.PASS,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.add_suggestion(
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
)
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.FAILED,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
with time_machine.travel(utcnow() + timedelta(hours=24)):
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 1
assert len(coresys.resolution.issues) == 1

View File

@@ -1,21 +1,15 @@
"""Test evaluations."""
from unittest.mock import Mock, patch
from unittest.mock import Mock
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.utils import check_exception_chain
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while evaluating system."""
await coresys.core.set_state(CoreState.RUNNING)
with patch(
"supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode",
side_effect=RuntimeError,
):
await coresys.resolution.evaluate.evaluate_system()
await coresys.resolution.evaluate.evaluate_system()
capture_exception.assert_called_once()
assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError)
capture_exception.assert_not_called()

View File

@@ -1,127 +0,0 @@
"""Testing handling with Security."""
from unittest.mock import AsyncMock, patch
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.security.const import ContentTrustResult
async def test_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
with patch(
"supervisor.security.module.cas_validate", AsyncMock()
) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with(
"notary@home-assistant.io", "ffffffffffffff"
)
async def test_disabled_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
coresys.security.content_trust = False
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert not cas_validate.called
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert not cas_validate.called
async def test_force_content_trust(coresys: CoreSys):
"""Force Content-Trust tests."""
with patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
coresys.security.force = True
with (
patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate,
pytest.raises(CodeNotaryError),
):
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
async def test_integrity_check_disabled(coresys: CoreSys):
"""Test integrity check with disabled content trust."""
coresys.security.content_trust = False
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.UNTESTED
assert result.supervisor == ContentTrustResult.UNTESTED
async def test_integrity_check(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
install_addon_ssh.check_trust = AsyncMock()
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS
async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust issues."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.ERROR
assert result.supervisor == ContentTrustResult.ERROR
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR
async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust failed."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.FAILED
assert result.supervisor == ContentTrustResult.FAILED
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED
async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust but no signed add-ons."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED

View File

@@ -86,10 +86,9 @@ async def test_os_update_path(
"""Test OS upgrade path across major versions."""
coresys.os._board = "rpi4" # pylint: disable=protected-access
coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access
with patch.object(type(coresys.security), "verify_own_content"):
await coresys.updater.fetch_data()
await coresys.updater.fetch_data()
assert coresys.updater.version_hassos == AwesomeVersion(expected)
assert coresys.updater.version_hassos == AwesomeVersion(expected)
@pytest.mark.usefixtures("no_job_throttle")
@@ -105,7 +104,6 @@ async def test_delayed_fetch_for_connectivity(
load_binary_fixture("version_stable.json")
)
coresys.websession.head = AsyncMock()
coresys.security.verify_own_content = AsyncMock()
# Network connectivity change causes a series of async tasks to eventually do a version fetch
# Rather then use some kind of sleep loop, set up listener for start of fetch data job

View File

@@ -1,128 +0,0 @@
"""Test CodeNotary."""
from __future__ import annotations
from dataclasses import dataclass
from unittest.mock import AsyncMock, Mock, patch
import pytest
from supervisor.exceptions import (
CodeNotaryBackendError,
CodeNotaryError,
CodeNotaryUntrusted,
)
from supervisor.utils.codenotary import calc_checksum, cas_validate
pytest.skip("code notary has been disabled due to issues", allow_module_level=True)
@dataclass
class SubprocessResponse:
"""Class for specifying subprocess exec response."""
returncode: int = 0
data: str = ""
error: str | None = None
exception: Exception | None = None
@pytest.fixture(name="subprocess_exec")
def fixture_subprocess_exec(request):
"""Mock subprocess exec with specific return."""
response = request.param
if response.exception:
communicate_return = AsyncMock(side_effect=response.exception)
else:
communicate_return = AsyncMock(return_value=(response.data, response.error))
exec_return = Mock(returncode=response.returncode, communicate=communicate_return)
with patch(
"supervisor.utils.codenotary.asyncio.create_subprocess_exec",
return_value=exec_return,
) as subprocess_exec:
yield subprocess_exec
def test_checksum_calc():
"""Calc Checkusm as test."""
assert calc_checksum("test") == calc_checksum(b"test")
assert (
calc_checksum("test")
== "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
)
async def test_valid_checksum():
"""Test a valid autorization."""
await cas_validate(
"notary@home-assistant.io",
"4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0",
)
async def test_invalid_checksum():
"""Test a invalid autorization."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[SubprocessResponse(returncode=1, error=b"x is not notarized")],
)
async def test_not_notarized_error(subprocess_exec):
"""Test received a not notarized error response from command."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[
SubprocessResponse(returncode=1, error=b"test"),
SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'),
SubprocessResponse(returncode=1, error="test".encode("utf-16")),
],
indirect=True,
)
async def test_cas_backend_error(subprocess_exec):
"""Test backend error executing cas command."""
with pytest.raises(CodeNotaryBackendError):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[SubprocessResponse(returncode=0, data='{"status":1}')],
indirect=True,
)
async def test_cas_notarized_untrusted(subprocess_exec):
"""Test cas found notarized but untrusted content."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True
)
async def test_cas_exec_os_error(subprocess_exec):
"""Test os error attempting to execute cas command."""
with pytest.raises(CodeNotaryError):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)