Compare commits

..

3 Commits

Author SHA1 Message Date
Stefan Agner
5d9d33c9fa Wait for Home Assistant Core to start 2025-10-27 12:36:48 +01:00
Stefan Agner
e4959b4f10 Log rejected WebSocket messages to Home Assistant Core 2025-10-27 11:46:04 +01:00
Stefan Agner
78353220de Reject Core Backup if sending backup start WebSocket message fails
If sending the WebSocket message to Home Assistant Core to inform it
about the start of a backup fails, we should reject the backup process
entirely. This prevents taking a backup without Home Assistant Core
being aware of it.

As a side effect, this likely prevents situation where Core does not
learn about the backup progress since we also won't be able to send
WebSocket messages about Job progress updates.
2025-10-27 11:36:25 +01:00
62 changed files with 1562 additions and 1582 deletions

View File

@@ -170,6 +170,8 @@ jobs:
--target /data \
--cosign \
--generic ${{ needs.init.outputs.version }}
env:
CAS_API_KEY: ${{ secrets.CAS_TOKEN }}
version:
name: Update version
@@ -291,6 +293,42 @@ jobs:
exit 1
fi
- name: Check the Supervisor code sign
if: needs.init.outputs.publish == 'true'
run: |
echo "Enable Content-Trust"
test=$(docker exec hassio_cli ha security options --content-trust=true --no-progress --raw-json | jq -r '.result')
if [ "$test" != "ok" ]; then
exit 1
fi
echo "Run supervisor health check"
test=$(docker exec hassio_cli ha resolution healthcheck --no-progress --raw-json | jq -r '.result')
if [ "$test" != "ok" ]; then
exit 1
fi
echo "Check supervisor unhealthy"
test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unhealthy[]')
if [ "$test" != "" ]; then
exit 1
fi
echo "Check supervisor supported"
test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unsupported[]')
if [[ "$test" =~ source_mods ]]; then
exit 1
fi
- name: Wait for Home Assistant Core to start
run: |
echo "Waiting for Home Assistant Core to start"
timeout 10m ha supervisor logs -f -n 10000 -b 0 | grep -q "Detect a running Home Assistant instance"
if [ "$?" != "0" ]; then
echo "Home Assistant Core did not start within 10 minutes"
exit 1
fi
- name: Create full backup
id: backup
run: |

View File

@@ -386,7 +386,7 @@ jobs:
-o console_output_style=count \
tests
- name: Upload coverage artifact
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
with:
name: coverage
path: .coverage
@@ -417,7 +417,7 @@ jobs:
echo "Failed to restore Python virtual environment from cache"
exit 1
- name: Download all coverage artifacts
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
uses: actions/download-artifact@634f93cb2916e3fdff6788551b99b062d0335ce0 # v5.0.0
with:
name: coverage
path: coverage/

View File

@@ -12,7 +12,7 @@ jobs:
- name: Check out code from GitHub
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
- name: Sentry Release
uses: getsentry/action-release@128c5058bbbe93c8e02147fe0a9c713f166259a6 # v3.4.0
uses: getsentry/action-release@4f502acc1df792390abe36f2dcb03612ef144818 # v3.3.0
env:
SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_AUTH_TOKEN }}
SENTRY_ORG: ${{ secrets.SENTRY_ORG }}

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.3
rev: v0.11.10
hooks:
- id: ruff
args:

View File

@@ -1,11 +1,10 @@
aiodns==3.5.0
aiohttp==3.13.2
aiohttp==3.13.1
atomicwrites-homeassistant==1.4.1
attrs==25.4.0
awesomeversion==25.8.0
backports.zstd==1.0.0
blockbuster==1.5.25
brotli==1.2.0
brotli==1.1.0
ciso8601==2.3.3
colorlog==6.10.1
cpe==1.3.1
@@ -18,14 +17,14 @@ faust-cchardet==2.1.19
gitpython==3.1.45
jinja2==3.1.6
log-rate-limit==1.4.2
orjson==3.11.4
orjson==3.11.3
pulsectl==24.12.0
pyudev==0.24.4
PyYAML==6.0.3
requests==2.32.5
securetar==2025.2.1
sentry-sdk==2.43.0
sentry-sdk==2.42.1
setuptools==80.9.0
voluptuous==0.15.2
dbus-fast==2.45.1
dbus-fast==2.44.5
zlib-fast==0.2.1

View File

@@ -1,14 +1,14 @@
astroid==4.0.2
coverage==7.11.3
astroid==4.0.1
coverage==7.11.0
mypy==1.18.2
pre-commit==4.4.0
pre-commit==4.3.0
pylint==4.0.2
pytest-aiohttp==1.1.0
pytest-asyncio==1.2.0
pytest-asyncio==0.25.2
pytest-cov==7.0.0
pytest-timeout==2.4.0
pytest==8.4.2
ruff==0.14.4
ruff==0.14.2
time-machine==2.19.0
types-docker==7.1.0.20251009
types-pyyaml==6.0.12.20250915

View File

@@ -66,35 +66,13 @@ from ..docker.const import ContainerState
from ..docker.monitor import DockerContainerStateEvent
from ..docker.stats import DockerStats
from ..exceptions import (
AddonBackupAppArmorProfileUnknownError,
AddonBackupExportImageUnknownError,
AddonBackupMetadataInvalidError,
AddonBuildImageUnknownError,
AddonConfigurationFileUnknownError,
AddonConfigurationInvalidError,
AddonContainerRunCommandUnknownError,
AddonContainerStartUnknownError,
AddonContainerStatsUnknownError,
AddonContainerStopUnknownError,
AddonContainerWriteStdinUnknownError,
AddonCreateBackupFileUnknownError,
AddonCreateBackupMetadataFileUnknownError,
AddonExtractBackupFileUnknownError,
AddonInstallImageUnknownError,
AddonNotRunningError,
AddonConfigurationError,
AddonNotSupportedError,
AddonNotSupportedWriteStdinError,
AddonPrePostBackupCommandReturnedError,
AddonRemoveImageUnknownError,
AddonRestoreAppArmorProfileUnknownError,
AddonRestoreBackupDataUnknownError,
AddonsError,
AddonsJobError,
ConfigurationFileError,
DockerBuildError,
DockerError,
HostAppArmorError,
StoreAddonNotFoundError,
)
from ..hardware.data import Device
from ..homeassistant.const import WSEvent
@@ -257,7 +235,7 @@ class Addon(AddonModel):
await self.instance.check_image(self.version, default_image, self.arch)
except DockerError:
_LOGGER.info("No %s addon Docker image %s found", self.slug, self.image)
with suppress(DockerError, AddonNotSupportedError):
with suppress(DockerError):
await self.instance.install(self.version, default_image, arch=self.arch)
self.persist[ATTR_IMAGE] = default_image
@@ -740,16 +718,18 @@ class Addon(AddonModel):
options = self.schema.validate(self.options)
await self.sys_run_in_executor(write_json_file, self.path_options, options)
except vol.Invalid as ex:
raise AddonConfigurationInvalidError(
_LOGGER.error,
addon=self.slug,
validation_error=humanize_error(self.options, ex),
) from None
except ConfigurationFileError as err:
_LOGGER.error(
"Add-on %s has invalid options: %s",
self.slug,
humanize_error(self.options, ex),
)
except ConfigurationFileError:
_LOGGER.error("Add-on %s can't write options", self.slug)
raise AddonConfigurationFileUnknownError(addon=self.slug) from err
else:
_LOGGER.debug("Add-on %s write options: %s", self.slug, options)
return
_LOGGER.debug("Add-on %s write options: %s", self.slug, options)
raise AddonConfigurationError()
@Job(
name="addon_unload",
@@ -792,7 +772,7 @@ class Addon(AddonModel):
async def install(self) -> None:
"""Install and setup this addon."""
if not self.addon_store:
raise StoreAddonNotFoundError(addon=self.slug)
raise AddonsError("Missing from store, cannot install!")
await self.sys_addons.data.install(self.addon_store)
@@ -813,15 +793,9 @@ class Addon(AddonModel):
await self.instance.install(
self.latest_version, self.addon_store.image, arch=self.arch
)
except AddonsError:
await self.sys_addons.data.uninstall(self)
raise
except DockerBuildError as err:
await self.sys_addons.data.uninstall(self)
raise AddonBuildImageUnknownError(addon=self.slug) from err
except DockerError as err:
await self.sys_addons.data.uninstall(self)
raise AddonInstallImageUnknownError(addon=self.slug) from err
raise AddonsError() from err
# Finish initialization and set up listeners
await self.load()
@@ -845,7 +819,7 @@ class Addon(AddonModel):
try:
await self.instance.remove(remove_image=remove_image)
except DockerError as err:
raise AddonRemoveImageUnknownError(addon=self.slug) from err
raise AddonsError() from err
self.state = AddonState.UNKNOWN
@@ -910,7 +884,7 @@ class Addon(AddonModel):
if it was running. Else nothing is returned.
"""
if not self.addon_store:
raise StoreAddonNotFoundError(addon=self.slug)
raise AddonsError("Missing from store, cannot update!")
old_image = self.image
# Cache data to prevent races with other updates to global
@@ -918,10 +892,8 @@ class Addon(AddonModel):
try:
await self.instance.update(store.version, store.image, arch=self.arch)
except DockerBuildError as err:
raise AddonBuildImageUnknownError(addon=self.slug) from err
except DockerError as err:
raise AddonInstallImageUnknownError(addon=self.slug) from err
raise AddonsError() from err
# Stop the addon if running
if (last_state := self.state) in {AddonState.STARTED, AddonState.STARTUP}:
@@ -963,18 +935,12 @@ class Addon(AddonModel):
"""
last_state: AddonState = self.state
try:
# remove docker container and image but not addon config
# remove docker container but not addon config
try:
await self.instance.remove()
except DockerError as err:
raise AddonRemoveImageUnknownError(addon=self.slug) from err
try:
await self.instance.install(self.version)
except DockerBuildError as err:
raise AddonBuildImageUnknownError(addon=self.slug) from err
except DockerError as err:
raise AddonInstallImageUnknownError(addon=self.slug) from err
raise AddonsError() from err
if self.addon_store:
await self.sys_addons.data.update(self.addon_store)
@@ -1146,7 +1112,7 @@ class Addon(AddonModel):
await self.instance.run()
except DockerError as err:
self.state = AddonState.ERROR
raise AddonContainerStartUnknownError(addon=self.slug) from err
raise AddonsError() from err
return self.sys_create_task(self._wait_for_startup())
@@ -1162,7 +1128,7 @@ class Addon(AddonModel):
await self.instance.stop()
except DockerError as err:
self.state = AddonState.ERROR
raise AddonContainerStopUnknownError(addon=self.slug) from err
raise AddonsError() from err
@Job(
name="addon_restart",
@@ -1195,12 +1161,9 @@ class Addon(AddonModel):
async def stats(self) -> DockerStats:
"""Return stats of container."""
try:
if not await self.is_running():
raise AddonNotRunningError(_LOGGER.warning, addon=self.slug)
return await self.instance.stats()
except DockerError as err:
raise AddonContainerStatsUnknownError(addon=self.slug) from err
raise AddonsError() from err
@Job(
name="addon_write_stdin",
@@ -1210,15 +1173,14 @@ class Addon(AddonModel):
async def write_stdin(self, data) -> None:
"""Write data to add-on stdin."""
if not self.with_stdin:
raise AddonNotSupportedWriteStdinError(_LOGGER.error, addon=self.slug)
raise AddonNotSupportedError(
f"Add-on {self.slug} does not support writing to stdin!", _LOGGER.error
)
try:
if not await self.is_running():
raise AddonNotRunningError(_LOGGER.warning, addon=self.slug)
await self.instance.write_stdin(data)
return await self.instance.write_stdin(data)
except DockerError as err:
raise AddonContainerWriteStdinUnknownError(addon=self.slug) from err
raise AddonsError() from err
async def _backup_command(self, command: str) -> None:
try:
@@ -1227,14 +1189,15 @@ class Addon(AddonModel):
_LOGGER.debug(
"Pre-/Post backup command failed with: %s", command_return.output
)
raise AddonPrePostBackupCommandReturnedError(
_LOGGER.error, addon=self.slug, exit_code=command_return.exit_code
raise AddonsError(
f"Pre-/Post backup command returned error code: {command_return.exit_code}",
_LOGGER.error,
)
except DockerError as err:
_LOGGER.error(
"Failed running pre-/post backup command %s: %s", command, err
)
raise AddonContainerRunCommandUnknownError(addon=self.slug) from err
raise AddonsError(
f"Failed running pre-/post backup command {command}: {str(err)}",
_LOGGER.error,
) from err
@Job(
name="addon_begin_backup",
@@ -1323,17 +1286,14 @@ class Addon(AddonModel):
try:
self.instance.export_image(temp_path.joinpath("image.tar"))
except DockerError as err:
raise AddonBackupExportImageUnknownError(
addon=self.slug
) from err
raise AddonsError() from err
# Store local configs/state
try:
write_json_file(temp_path.joinpath("addon.json"), metadata)
except ConfigurationFileError as err:
_LOGGER.error("Can't save meta for %s: %s", self.slug, err)
raise AddonCreateBackupMetadataFileUnknownError(
addon=self.slug
raise AddonsError(
f"Can't save meta for {self.slug}", _LOGGER.error
) from err
# Store AppArmor Profile
@@ -1344,8 +1304,8 @@ class Addon(AddonModel):
apparmor_profile, profile_backup_file
)
except HostAppArmorError as err:
raise AddonBackupAppArmorProfileUnknownError(
addon=self.slug
raise AddonsError(
"Can't backup AppArmor profile", _LOGGER.error
) from err
# Write tarfile
@@ -1400,8 +1360,7 @@ class Addon(AddonModel):
)
_LOGGER.info("Finish backup for addon %s", self.slug)
except (tarfile.TarError, OSError, AddFileError) as err:
_LOGGER.error("Can't write backup tarfile for addon %s: %s", self.slug, err)
raise AddonCreateBackupFileUnknownError(addon=self.slug) from err
raise AddonsError(f"Can't write tarfile: {err}", _LOGGER.error) from err
finally:
if was_running:
wait_for_start = await self.end_backup()
@@ -1443,24 +1402,28 @@ class Addon(AddonModel):
try:
tmp, data = await self.sys_run_in_executor(_extract_tarfile)
except tarfile.TarError as err:
_LOGGER.error("Can't extract backup tarfile for %s: %s", self.slug, err)
raise AddonExtractBackupFileUnknownError(addon=self.slug) from err
raise AddonsError(
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error
) from err
except ConfigurationFileError as err:
raise AddonConfigurationFileUnknownError(addon=self.slug) from err
raise AddonsError() from err
try:
# Validate
try:
data = SCHEMA_ADDON_BACKUP(data)
except vol.Invalid as err:
raise AddonBackupMetadataInvalidError(
raise AddonsError(
f"Can't validate {self.slug}, backup data: {humanize_error(data, err)}",
_LOGGER.error,
addon=self.slug,
validation_error=humanize_error(data, err),
) from err
# Validate availability. Raises if not
self._validate_availability(data[ATTR_SYSTEM], logger=_LOGGER.error)
# If available
if not self._available(data[ATTR_SYSTEM]):
raise AddonNotSupportedError(
f"Add-on {self.slug} is not available for this platform",
_LOGGER.error,
)
# Restore local add-on information
_LOGGER.info("Restore config for addon %s", self.slug)
@@ -1519,10 +1482,9 @@ class Addon(AddonModel):
try:
await self.sys_run_in_executor(_restore_data)
except shutil.Error as err:
_LOGGER.error(
"Can't restore origin data for %s: %s", self.slug, err
)
raise AddonRestoreBackupDataUnknownError(addon=self.slug) from err
raise AddonsError(
f"Can't restore origin data: {err}", _LOGGER.error
) from err
# Restore AppArmor
profile_file = Path(tmp.name, "apparmor.txt")
@@ -1533,13 +1495,10 @@ class Addon(AddonModel):
)
except HostAppArmorError as err:
_LOGGER.error(
"Can't restore AppArmor profile for add-on %s: %s",
"Can't restore AppArmor profile for add-on %s",
self.slug,
err,
)
raise AddonRestoreAppArmorProfileUnknownError(
addon=self.slug
) from err
raise AddonsError() from err
finally:
# Is add-on loaded
@@ -1554,6 +1513,13 @@ class Addon(AddonModel):
_LOGGER.info("Finished restore for add-on %s", self.slug)
return wait_for_start
def check_trust(self) -> Awaitable[None]:
"""Calculate Addon docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
@Job(
name="addon_restart_after_problem",
throttle_period=WATCHDOG_THROTTLE_PERIOD,
@@ -1596,15 +1562,7 @@ class Addon(AddonModel):
)
break
# Exponential backoff to spread retries over the throttle window
delay = WATCHDOG_RETRY_SECONDS * (1 << max(attempts - 1, 0))
_LOGGER.debug(
"Watchdog will retry addon %s in %s seconds (attempt %s)",
self.name,
delay,
attempts + 1,
)
await asyncio.sleep(delay)
await asyncio.sleep(WATCHDOG_RETRY_SECONDS)
async def container_state_changed(self, event: DockerContainerStateEvent) -> None:
"""Set addon state from container state."""

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
from functools import cached_property
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
@@ -20,20 +19,13 @@ from ..const import (
)
from ..coresys import CoreSys, CoreSysAttributes
from ..docker.interface import MAP_ARCH
from ..exceptions import (
AddonBuildArchitectureNotSupportedError,
AddonBuildDockerfileMissingError,
ConfigurationFileError,
HassioArchNotFound,
)
from ..exceptions import ConfigurationFileError, HassioArchNotFound
from ..utils.common import FileConfiguration, find_one_filetype
from .validate import SCHEMA_BUILD_CONFIG
if TYPE_CHECKING:
from .manager import AnyAddon
_LOGGER: logging.Logger = logging.getLogger(__name__)
class AddonBuild(FileConfiguration, CoreSysAttributes):
"""Handle build options for add-ons."""
@@ -114,7 +106,7 @@ class AddonBuild(FileConfiguration, CoreSysAttributes):
return self.addon.path_location.joinpath(f"Dockerfile.{self.arch}")
return self.addon.path_location.joinpath("Dockerfile")
async def is_valid(self) -> None:
async def is_valid(self) -> bool:
"""Return true if the build env is valid."""
def build_is_valid() -> bool:
@@ -126,17 +118,9 @@ class AddonBuild(FileConfiguration, CoreSysAttributes):
)
try:
if not await self.sys_run_in_executor(build_is_valid):
raise AddonBuildDockerfileMissingError(
_LOGGER.error, addon=self.addon.slug
)
return await self.sys_run_in_executor(build_is_valid)
except HassioArchNotFound:
raise AddonBuildArchitectureNotSupportedError(
_LOGGER.error,
addon=self.addon.slug,
addon_arch_list=self.addon.supported_arch,
system_arch_list=self.sys_arch.supported,
) from None
return False
def get_docker_args(
self, version: AwesomeVersion, image_tag: str

View File

@@ -103,6 +103,7 @@ from .configuration import FolderMapping
from .const import (
ATTR_BACKUP,
ATTR_BREAKING_VERSIONS,
ATTR_CODENOTARY,
ATTR_PATH,
ATTR_READ_ONLY,
AddonBackupMode,
@@ -631,8 +632,13 @@ class AddonModel(JobGroup, ABC):
@property
def signed(self) -> bool:
"""Currently no signing support."""
return False
"""Return True if the image is signed."""
return ATTR_CODENOTARY in self.data
@property
def codenotary(self) -> str | None:
"""Return Signer email address for CAS."""
return self.data.get(ATTR_CODENOTARY)
@property
def breaking_versions(self) -> list[AwesomeVersion]:

View File

@@ -207,12 +207,6 @@ def _warn_addon_config(config: dict[str, Any]):
name,
)
if ATTR_CODENOTARY in config:
_LOGGER.warning(
"Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.",
name,
)
return config
@@ -423,6 +417,7 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce(
AddonBackupMode
),
vol.Optional(ATTR_CODENOTARY): vol.Email(),
vol.Optional(ATTR_OPTIONS, default={}): dict,
vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
vol.Schema({str: SCHEMA_ELEMENT}),

View File

@@ -100,8 +100,6 @@ from ..const import (
from ..coresys import CoreSysAttributes
from ..docker.stats import DockerStats
from ..exceptions import (
AddonBootConfigCannotChangeError,
AddonConfigurationInvalidError,
APIAddonNotInstalled,
APIError,
APIForbidden,
@@ -127,7 +125,6 @@ SCHEMA_OPTIONS = vol.Schema(
vol.Optional(ATTR_AUDIO_INPUT): vol.Maybe(str),
vol.Optional(ATTR_INGRESS_PANEL): vol.Boolean(),
vol.Optional(ATTR_WATCHDOG): vol.Boolean(),
vol.Optional(ATTR_OPTIONS): vol.Maybe(dict),
}
)
@@ -303,20 +300,19 @@ class APIAddons(CoreSysAttributes):
# Update secrets for validation
await self.sys_homeassistant.secrets.reload()
# Extend schema with add-on specific validation
addon_schema = SCHEMA_OPTIONS.extend(
{vol.Optional(ATTR_OPTIONS): vol.Maybe(addon.schema)}
)
# Validate/Process Body
body = await api_validate(SCHEMA_OPTIONS, request)
body = await api_validate(addon_schema, request)
if ATTR_OPTIONS in body:
try:
addon.options = addon.schema(body[ATTR_OPTIONS])
except vol.Invalid as ex:
raise AddonConfigurationInvalidError(
addon=addon.slug,
validation_error=humanize_error(body[ATTR_OPTIONS], ex),
) from None
addon.options = body[ATTR_OPTIONS]
if ATTR_BOOT in body:
if addon.boot_config == AddonBootConfig.MANUAL_ONLY:
raise AddonBootConfigCannotChangeError(
addon=addon.slug, boot_config=addon.boot_config.value
raise APIError(
f"Addon {addon.slug} boot option is set to {addon.boot_config} so it cannot be changed"
)
addon.boot = body[ATTR_BOOT]
if ATTR_AUTO_UPDATE in body:

View File

@@ -253,28 +253,18 @@ class APIIngress(CoreSysAttributes):
skip_auto_headers={hdrs.CONTENT_TYPE},
) as result:
headers = _response_header(result)
# Avoid parsing content_type in simple cases for better performance
if maybe_content_type := result.headers.get(hdrs.CONTENT_TYPE):
content_type = (maybe_content_type.partition(";"))[0].strip()
else:
content_type = result.content_type
# Empty body responses (304, 204, HEAD, etc.) should not be streamed,
# otherwise aiohttp < 3.9.0 may generate an invalid "0\r\n\r\n" chunk
# This also avoids setting content_type for empty responses.
if must_be_empty_body(request.method, result.status):
# If upstream contains content-type, preserve it (e.g. for HEAD requests)
if maybe_content_type:
headers[hdrs.CONTENT_TYPE] = content_type
return web.Response(
headers=headers,
status=result.status,
)
# Simple request
if (
hdrs.CONTENT_LENGTH in result.headers
# empty body responses should not be streamed,
# otherwise aiohttp < 3.9.0 may generate
# an invalid "0\r\n\r\n" chunk instead of an empty response.
must_be_empty_body(request.method, result.status)
or hdrs.CONTENT_LENGTH in result.headers
and int(result.headers.get(hdrs.CONTENT_LENGTH, 0)) < 4_194_000
):
# Return Response

View File

@@ -1,20 +1,24 @@
"""Init file for Supervisor Security RESTful API."""
import asyncio
import logging
from typing import Any
from aiohttp import web
import attr
import voluptuous as vol
from supervisor.exceptions import APIGone
from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED
from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED
from ..coresys import CoreSysAttributes
from .utils import api_process, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
# pylint: disable=no-value-for-parameter
SCHEMA_OPTIONS = vol.Schema(
{
vol.Optional(ATTR_PWNED): vol.Boolean(),
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
}
)
@@ -27,6 +31,7 @@ class APISecurity(CoreSysAttributes):
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return Security information."""
return {
ATTR_CONTENT_TRUST: self.sys_security.content_trust,
ATTR_PWNED: self.sys_security.pwned,
ATTR_FORCE_SECURITY: self.sys_security.force,
}
@@ -38,6 +43,8 @@ class APISecurity(CoreSysAttributes):
if ATTR_PWNED in body:
self.sys_security.pwned = body[ATTR_PWNED]
if ATTR_CONTENT_TRUST in body:
self.sys_security.content_trust = body[ATTR_CONTENT_TRUST]
if ATTR_FORCE_SECURITY in body:
self.sys_security.force = body[ATTR_FORCE_SECURITY]
@@ -47,9 +54,6 @@ class APISecurity(CoreSysAttributes):
@api_process
async def integrity_check(self, request: web.Request) -> dict[str, Any]:
"""Run backend integrity check.
CodeNotary integrity checking has been removed. This endpoint now returns
an error indicating the feature is gone.
"""
raise APIGone("Integrity check feature has been removed.")
"""Run backend integrity check."""
result = await asyncio.shield(self.sys_security.integrity_check())
return attr.asdict(result)

View File

@@ -53,7 +53,7 @@ from ..const import (
REQUEST_FROM,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden, APINotFound, StoreAddonNotFoundError
from ..exceptions import APIError, APIForbidden, APINotFound
from ..store.addon import AddonStore
from ..store.repository import Repository
from ..store.validate import validate_repository
@@ -104,7 +104,7 @@ class APIStore(CoreSysAttributes):
addon_slug: str = request.match_info["addon"]
if not (addon := self.sys_addons.get(addon_slug)):
raise StoreAddonNotFoundError(addon=addon_slug)
raise APINotFound(f"Addon {addon_slug} does not exist")
if installed and not addon.is_installed:
raise APIError(f"Addon {addon_slug} is not installed")
@@ -112,7 +112,7 @@ class APIStore(CoreSysAttributes):
if not installed and addon.is_installed:
addon = cast(Addon, addon)
if not addon.addon_store:
raise StoreAddonNotFoundError(addon=addon_slug)
raise APINotFound(f"Addon {addon_slug} does not exist in the store")
return addon.addon_store
return addon

View File

@@ -16,12 +16,14 @@ from ..const import (
ATTR_BLK_READ,
ATTR_BLK_WRITE,
ATTR_CHANNEL,
ATTR_CONTENT_TRUST,
ATTR_COUNTRY,
ATTR_CPU_PERCENT,
ATTR_DEBUG,
ATTR_DEBUG_BLOCK,
ATTR_DETECT_BLOCKING_IO,
ATTR_DIAGNOSTICS,
ATTR_FORCE_SECURITY,
ATTR_HEALTHY,
ATTR_ICON,
ATTR_IP_ADDRESS,
@@ -67,6 +69,8 @@ SCHEMA_OPTIONS = vol.Schema(
vol.Optional(ATTR_DEBUG): vol.Boolean(),
vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(),
vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(),
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
vol.Optional(ATTR_DETECT_BLOCKING_IO): vol.Coerce(DetectBlockingIO),
vol.Optional(ATTR_COUNTRY): str,

View File

@@ -151,7 +151,7 @@ def api_return_error(
if check_exception_chain(error, DockerAPIError):
message = format_message(message)
if not message:
message = "Unknown error, see Supervisor logs (check with 'ha supervisor logs')"
message = "Unknown error, see supervisor"
match error_type:
case const.CONTENT_TYPE_TEXT:

View File

@@ -9,10 +9,8 @@ from .addons.addon import Addon
from .const import ATTR_PASSWORD, ATTR_TYPE, ATTR_USERNAME, FILE_HASSIO_AUTH
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
AuthHomeAssistantAPIValidationError,
AuthInvalidNoneValueError,
AuthError,
AuthListUsersError,
AuthListUsersNoneResponseError,
AuthPasswordResetError,
HomeAssistantAPIError,
HomeAssistantWSError,
@@ -85,8 +83,10 @@ class Auth(FileConfiguration, CoreSysAttributes):
self, addon: Addon, username: str | None, password: str | None
) -> bool:
"""Check username login."""
if username is None or password is None:
raise AuthInvalidNoneValueError(_LOGGER.error)
if password is None:
raise AuthError("None as password is not supported!", _LOGGER.error)
if username is None:
raise AuthError("None as username is not supported!", _LOGGER.error)
_LOGGER.info("Auth request from '%s' for '%s'", addon.slug, username)
@@ -137,7 +137,7 @@ class Auth(FileConfiguration, CoreSysAttributes):
finally:
self._running.pop(username, None)
raise AuthHomeAssistantAPIValidationError()
raise AuthError()
async def change_password(self, username: str, password: str) -> None:
"""Change user password login."""
@@ -155,7 +155,7 @@ class Auth(FileConfiguration, CoreSysAttributes):
except HomeAssistantAPIError as err:
_LOGGER.error("Can't request password reset on Home Assistant: %s", err)
raise AuthPasswordResetError(user=username)
raise AuthPasswordResetError()
async def list_users(self) -> list[dict[str, Any]]:
"""List users on the Home Assistant instance."""
@@ -166,12 +166,15 @@ class Auth(FileConfiguration, CoreSysAttributes):
{ATTR_TYPE: "config/auth/list"}
)
except HomeAssistantWSError as err:
_LOGGER.error("Can't request listing users on Home Assistant: %s", err)
raise AuthListUsersError() from err
raise AuthListUsersError(
f"Can't request listing users on Home Assistant: {err}", _LOGGER.error
) from err
if users is not None:
return users
raise AuthListUsersNoneResponseError(_LOGGER.error)
raise AuthListUsersError(
"Can't request listing users on Home Assistant!", _LOGGER.error
)
@staticmethod
def _rehash(value: str, salt2: str = "") -> str:

View File

@@ -105,6 +105,7 @@ async def initialize_coresys() -> CoreSys:
if coresys.dev:
coresys.updater.channel = UpdateChannel.DEV
coresys.security.content_trust = False
# Convert datetime
logging.Formatter.converter = lambda *args: coresys.now().timetuple()

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
from collections.abc import Awaitable
from contextlib import suppress
from ipaddress import IPv4Address
import logging
@@ -33,7 +32,6 @@ from ..coresys import CoreSys
from ..exceptions import (
CoreDNSError,
DBusError,
DockerBuildError,
DockerError,
DockerJobError,
DockerNotFound,
@@ -681,8 +679,9 @@ class DockerAddon(DockerInterface):
async def _build(self, version: AwesomeVersion, image: str | None = None) -> None:
"""Build a Docker container."""
build_env = await AddonBuild(self.coresys, self.addon).load_config()
# Check if the build environment is valid, raises if not
await build_env.is_valid()
if not await build_env.is_valid():
_LOGGER.error("Invalid build environment, can't build this add-on!")
raise DockerError()
_LOGGER.info("Starting build for %s:%s", self.image, version)
@@ -731,9 +730,8 @@ class DockerAddon(DockerInterface):
self._meta = docker_image.attrs
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerBuildError(
f"Can't build {self.image}:{version}: {err!s}", _LOGGER.error
) from err
_LOGGER.error("Can't build %s:%s: %s", self.image, version, err)
raise DockerError() from err
_LOGGER.info("Build %s:%s done", self.image, version)
@@ -790,9 +788,12 @@ class DockerAddon(DockerInterface):
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
def write_stdin(self, data: bytes) -> Awaitable[None]:
async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin."""
return self.sys_run_in_executor(self._write_stdin, data)
if not await self.is_running():
raise DockerError()
await self.sys_run_in_executor(self._write_stdin, data)
def _write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin.
@@ -845,6 +846,16 @@ class DockerAddon(DockerInterface):
):
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
if not self.addon.signed:
return
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_content(
cast(str, self.addon.codenotary), checksum
)
@Job(
name="docker_addon_hardware_events",
conditions=[JobCondition.OS_AGENT],

View File

@@ -5,7 +5,7 @@ from ipaddress import IPv4Address
import logging
import re
from awesomeversion import AwesomeVersion
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
from docker.types import Mount
from ..const import LABEL_MACHINE
@@ -244,3 +244,13 @@ class DockerHomeAssistant(DockerInterface):
self.image,
self.sys_homeassistant.version,
)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
try:
if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST:
return
except AwesomeVersionCompareException:
return
await super()._validate_trust(image_id)

View File

@@ -31,12 +31,15 @@ from ..const import (
)
from ..coresys import CoreSys
from ..exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
DockerAPIError,
DockerError,
DockerJobError,
DockerLogOutOfOrder,
DockerNotFound,
DockerRequestError,
DockerTrustError,
)
from ..jobs import SupervisorJob
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
@@ -217,7 +220,7 @@ class DockerInterface(JobGroup, ABC):
await self.sys_run_in_executor(self.sys_docker.docker.login, **credentials)
def _process_pull_image_log( # noqa: C901
def _process_pull_image_log(
self, install_job_id: str, reference: PullLogEntry
) -> None:
"""Process events fired from a docker while pulling an image, filtered to a given job id."""
@@ -318,17 +321,13 @@ class DockerInterface(JobGroup, ABC):
},
)
else:
# If we reach DOWNLOAD_COMPLETE without ever having set extra (small layers that skip
# the downloading phase), set a minimal extra so aggregate progress calculation can proceed
extra = job.extra
if stage == PullImageLayerStage.DOWNLOAD_COMPLETE and not job.extra:
extra = {"current": 1, "total": 1}
job.update(
progress=progress,
stage=stage.status,
done=stage == PullImageLayerStage.PULL_COMPLETE,
extra=None if stage == PullImageLayerStage.RETRYING_DOWNLOAD else extra,
extra=None
if stage == PullImageLayerStage.RETRYING_DOWNLOAD
else job.extra,
)
# Once we have received a progress update for every child job, start to set status of the main one
@@ -426,6 +425,18 @@ class DockerInterface(JobGroup, ABC):
platform=MAP_ARCH[image_arch],
)
# Validate content
try:
await self._validate_trust(cast(str, docker_image.id))
except CodeNotaryError:
with suppress(docker.errors.DockerException):
await self.sys_run_in_executor(
self.sys_docker.images.remove,
image=f"{image}:{version!s}",
force=True,
)
raise
# Tag latest
if latest:
_LOGGER.info(
@@ -451,6 +462,16 @@ class DockerInterface(JobGroup, ABC):
raise DockerError(
f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error
) from err
except CodeNotaryUntrusted as err:
raise DockerTrustError(
f"Pulled image {image}:{version!s} failed on content-trust verification!",
_LOGGER.critical,
) from err
except CodeNotaryError as err:
raise DockerTrustError(
f"Error happened on Content-Trust check for {image}:{version!s}: {err!s}",
_LOGGER.error,
) from err
finally:
if listener:
self.sys_bus.remove_listener(listener)
@@ -466,34 +487,35 @@ class DockerInterface(JobGroup, ABC):
return True
return False
async def _get_container(self) -> Container | None:
"""Get docker container, returns None if not found."""
async def is_running(self) -> bool:
"""Return True if Docker is running."""
try:
return await self.sys_run_in_executor(
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
except docker.errors.NotFound:
return None
return False
except docker.errors.DockerException as err:
raise DockerAPIError(
f"Docker API error occurred while getting container information: {err!s}"
) from err
raise DockerAPIError() from err
except requests.RequestException as err:
raise DockerRequestError(
f"Error communicating with Docker to get container information: {err!s}"
) from err
raise DockerRequestError() from err
async def is_running(self) -> bool:
"""Return True if Docker is running."""
if docker_container := await self._get_container():
return docker_container.status == "running"
return False
return docker_container.status == "running"
async def current_state(self) -> ContainerState:
"""Return current state of container."""
if docker_container := await self._get_container():
return _container_state_from_model(docker_container)
return ContainerState.UNKNOWN
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
except docker.errors.NotFound:
return ContainerState.UNKNOWN
except docker.errors.DockerException as err:
raise DockerAPIError() from err
except requests.RequestException as err:
raise DockerRequestError() from err
return _container_state_from_model(docker_container)
@Job(name="docker_interface_attach", concurrency=JobConcurrency.GROUP_QUEUE)
async def attach(
@@ -528,9 +550,7 @@ class DockerInterface(JobGroup, ABC):
# Successful?
if not self._meta:
raise DockerError(
f"Could not get metadata on container or image for {self.name}"
)
raise DockerError()
_LOGGER.info("Attaching to %s with version %s", self.image, self.version)
@Job(
@@ -732,8 +752,14 @@ class DockerInterface(JobGroup, ABC):
async def is_failed(self) -> bool:
"""Return True if Docker is failing state."""
if not (docker_container := await self._get_container()):
try:
docker_container = await self.sys_run_in_executor(
self.sys_docker.containers.get, self.name
)
except docker.errors.NotFound:
return False
except (docker.errors.DockerException, requests.RequestException) as err:
raise DockerError() from err
# container is not running
if docker_container.status != "exited":
@@ -783,3 +809,24 @@ class DockerInterface(JobGroup, ABC):
return self.sys_run_in_executor(
self.sys_docker.container_run_inside, self.name, command
)
async def _validate_trust(self, image_id: str) -> None:
"""Validate trust of content."""
checksum = image_id.partition(":")[2]
return await self.sys_security.verify_own_content(checksum)
@Job(
name="docker_interface_check_trust",
on_condition=DockerJobError,
concurrency=JobConcurrency.GROUP_REJECT,
)
async def check_trust(self) -> None:
"""Check trust of exists Docker image."""
try:
image = await self.sys_run_in_executor(
self.sys_docker.images.get, f"{self.image}:{self.version!s}"
)
except (docker.errors.DockerException, requests.RequestException):
return
await self._validate_trust(cast(str, image.id))

View File

@@ -566,10 +566,7 @@ class DockerAPI(CoreSysAttributes):
except NotFound:
return False
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get container {name} or image {image}:{version} to check state: {err!s}",
_LOGGER.error,
) from err
raise DockerError() from err
# Check the image is correct and state is good
return (
@@ -585,13 +582,9 @@ class DockerAPI(CoreSysAttributes):
try:
docker_container: Container = self.containers.get(name)
except NotFound:
# Generally suppressed so we don't log this
raise DockerNotFound() from None
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get container {name} for stopping: {err!s}",
_LOGGER.error,
) from err
raise DockerError() from err
if docker_container.status == "running":
_LOGGER.info("Stopping %s application", name)
@@ -631,13 +624,9 @@ class DockerAPI(CoreSysAttributes):
try:
container: Container = self.containers.get(name)
except NotFound:
raise DockerNotFound(
f"Container {name} not found for restarting", _LOGGER.warning
) from None
raise DockerNotFound() from None
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get container {name} for restarting: {err!s}", _LOGGER.error
) from err
raise DockerError() from err
_LOGGER.info("Restarting %s", name)
try:
@@ -650,13 +639,9 @@ class DockerAPI(CoreSysAttributes):
try:
docker_container: Container = self.containers.get(name)
except NotFound:
raise DockerNotFound(
f"Container {name} not found for logs", _LOGGER.warning
) from None
raise DockerNotFound() from None
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not get container {name} for logs: {err!s}", _LOGGER.error
) from err
raise DockerError() from err
try:
return docker_container.logs(tail=tail, stdout=True, stderr=True)
@@ -670,13 +655,9 @@ class DockerAPI(CoreSysAttributes):
try:
docker_container: Container = self.containers.get(name)
except NotFound:
raise DockerNotFound(
f"Container {name} not found for stats", _LOGGER.warning
) from None
raise DockerNotFound() from None
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Could not inspect container '{name}': {err!s}", _LOGGER.error
) from err
raise DockerError() from err
# container is not running
if docker_container.status != "running":
@@ -694,21 +675,15 @@ class DockerAPI(CoreSysAttributes):
try:
docker_container: Container = self.containers.get(name)
except NotFound:
raise DockerNotFound(
f"Container {name} not found for running command", _LOGGER.warning
) from None
raise DockerNotFound() from None
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't get container {name} to run command: {err!s}"
) from err
raise DockerError() from err
# Execute
try:
code, output = docker_container.exec_run(command)
except (DockerException, requests.RequestException) as err:
raise DockerError(
f"Can't run command in container {name}: {err!s}"
) from err
raise DockerError() from err
return CommandReturn(code, output)

View File

@@ -3,23 +3,23 @@
from collections.abc import Callable
from typing import Any
MESSAGE_CHECK_SUPERVISOR_LOGS = (
"Check supervisor logs for details (check with '{logs_command}')"
)
EXTRA_FIELDS_LOGS_COMMAND = {"logs_command": "ha supervisor logs"}
class HassioError(Exception):
"""Root exception."""
error_key: str | None = None
message_template: str | None = None
extra_fields: dict[str, Any] | None = None
def __init__(
self, message: str | None = None, logger: Callable[..., None] | None = None
self,
message: str | None = None,
logger: Callable[..., None] | None = None,
*,
extra_fields: dict[str, Any] | None = None,
) -> None:
"""Raise & log."""
self.extra_fields = extra_fields or {}
if not message and self.message_template:
message = (
self.message_template.format(**self.extra_fields)
@@ -41,82 +41,6 @@ class HassioNotSupportedError(HassioError):
"""Function is not supported."""
# API
class APIError(HassioError, RuntimeError):
"""API errors."""
status = 400
def __init__(
self,
message: str | None = None,
logger: Callable[..., None] | None = None,
*,
job_id: str | None = None,
) -> None:
"""Raise & log, optionally with job."""
super().__init__(message, logger)
self.job_id = job_id
class APIUnauthorized(APIError):
"""API unauthorized error."""
status = 401
class APIForbidden(APIError):
"""API forbidden error."""
status = 403
class APINotFound(APIError):
"""API not found error."""
status = 404
class APIGone(APIError):
"""API is no longer available."""
status = 410
class APIInternalServerError(APIError):
"""API internal server error."""
status = 500
class APIAddonNotInstalled(APIError):
"""Not installed addon requested at addons API."""
class APIDBMigrationInProgress(APIError):
"""Service is unavailable due to an offline DB migration is in progress."""
status = 503
class APIUnknownSupervisorError(APIError):
"""Unknown error occurred within supervisor. Adds supervisor check logs rider to mesage template."""
status = 500
def __init__(
self, logger: Callable[..., None] | None = None, *, job_id: str | None = None
) -> None:
"""Initialize exception."""
self.message_template = (
f"{self.message_template}. {MESSAGE_CHECK_SUPERVISOR_LOGS}"
)
self.extra_fields = (self.extra_fields or {}) | EXTRA_FIELDS_LOGS_COMMAND
super().__init__(None, logger, job_id=job_id)
# JobManager
@@ -198,20 +122,6 @@ class SupervisorAppArmorError(SupervisorError):
"""Supervisor AppArmor error."""
class SupervisorStatsError(SupervisorError, APIInternalServerError):
"""Raise on issue getting stats for Supervisor container."""
error_key = "supervisor_stats_error"
message_template = (
f"Can't get stats for Supervisor container. {MESSAGE_CHECK_SUPERVISOR_LOGS}"
)
extra_fields = EXTRA_FIELDS_LOGS_COMMAND.copy()
def __init__(self, logger: Callable[..., None] | None = None) -> None:
"""Initialize exception."""
super().__init__(None, logger)
class SupervisorJobError(SupervisorError, JobException):
"""Raise on job errors."""
@@ -340,96 +250,6 @@ class AddonConfigurationError(AddonsError):
"""Error with add-on configuration."""
class AddonConfigurationInvalidError(AddonConfigurationError, APIError):
"""Raise if invalid configuration provided for addon."""
error_key = "addon_configuration_invalid_error"
message_template = "Add-on {addon} has invalid options: {validation_error}"
def __init__(
self,
logger: Callable[..., None] | None = None,
*,
addon: str,
validation_error: str,
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon, "validation_error": validation_error}
super().__init__(None, logger)
class AddonBackupMetadataInvalidError(AddonsError, APIError):
"""Raise if invalid metadata file provided for addon in backup."""
error_key = "addon_backup_metadata_invalid_error"
message_template = (
"Metadata file for add-on {addon} in backup is invalid: {validation_error}"
)
def __init__(
self,
logger: Callable[..., None] | None = None,
*,
addon: str,
validation_error: str,
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon, "validation_error": validation_error}
super().__init__(None, logger)
class AddonBootConfigCannotChangeError(AddonsError, APIError):
"""Raise if user attempts to change addon boot config when it can't be changed."""
error_key = "addon_boot_config_cannot_change_error"
message_template = (
"Addon {addon} boot option is set to {boot_config} so it cannot be changed"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str, boot_config: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon, "boot_config": boot_config}
super().__init__(None, logger)
class AddonNotRunningError(AddonsError, APIError):
"""Raise when an addon is not running."""
error_key = "addon_not_running_error"
message_template = "Add-on {addon} is not running"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(None, logger)
class AddonPrePostBackupCommandReturnedError(AddonsError, APIError):
"""Raise when addon's pre/post backup command returns an error."""
error_key = "addon_pre_post_backup_command_returned_error"
message_template = (
"Pre-/Post backup command for add-on {addon} returned error code: "
"{exit_code}. Please report this to the addon developer. Enable debug "
"logging to capture complete command output using {debug_logging_command}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str, exit_code: int
) -> None:
"""Initialize exception."""
self.extra_fields = {
"addon": addon,
"exit_code": exit_code,
"debug_logging_command": "ha supervisor options --logging debug",
}
super().__init__(None, logger)
class AddonNotSupportedError(HassioNotSupportedError):
"""Addon doesn't support a function."""
@@ -448,8 +268,11 @@ class AddonNotSupportedArchitectureError(AddonNotSupportedError):
architectures: list[str],
) -> None:
"""Initialize exception."""
self.extra_fields = {"slug": slug, "architectures": ", ".join(architectures)}
super().__init__(None, logger)
super().__init__(
None,
logger,
extra_fields={"slug": slug, "architectures": ", ".join(architectures)},
)
class AddonNotSupportedMachineTypeError(AddonNotSupportedError):
@@ -466,8 +289,11 @@ class AddonNotSupportedMachineTypeError(AddonNotSupportedError):
machine_types: list[str],
) -> None:
"""Initialize exception."""
self.extra_fields = {"slug": slug, "machine_types": ", ".join(machine_types)}
super().__init__(None, logger)
super().__init__(
None,
logger,
extra_fields={"slug": slug, "machine_types": ", ".join(machine_types)},
)
class AddonNotSupportedHomeAssistantVersionError(AddonNotSupportedError):
@@ -484,307 +310,11 @@ class AddonNotSupportedHomeAssistantVersionError(AddonNotSupportedError):
version: str,
) -> None:
"""Initialize exception."""
self.extra_fields = {"slug": slug, "version": version}
super().__init__(None, logger)
class AddonNotSupportedWriteStdinError(AddonNotSupportedError, APIError):
"""Addon does not support writing to stdin."""
error_key = "addon_not_supported_write_stdin_error"
message_template = "Add-on {addon} does not support writing to stdin"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(None, logger)
class AddonBuildDockerfileMissingError(AddonNotSupportedError, APIError):
"""Raise when addon build invalid because dockerfile is missing."""
error_key = "addon_build_dockerfile_missing_error"
message_template = (
"Cannot build addon '{addon}' because dockerfile is missing. A repair "
"using '{repair_command}' will fix this if the cause is data "
"corruption. Otherwise please report this to the addon developer."
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon, "repair_command": "ha supervisor repair"}
super().__init__(None, logger)
class AddonBuildArchitectureNotSupportedError(AddonNotSupportedError, APIError):
"""Raise when addon cannot be built on system because it doesn't support its architecture."""
error_key = "addon_build_architecture_not_supported_error"
message_template = (
"Cannot build addon '{addon}' because its supported architectures "
"({addon_arches}) do not match the system supported architectures ({system_arches})"
)
def __init__(
self,
logger: Callable[..., None] | None = None,
*,
addon: str,
addon_arch_list: list[str],
system_arch_list: list[str],
) -> None:
"""Initialize exception."""
self.extra_fields = {
"addon": addon,
"addon_arches": ", ".join(addon_arch_list),
"system_arches": ", ".join(system_arch_list),
}
super().__init__(None, logger)
# pylint: disable-next=too-many-ancestors
class AddonConfigurationFileUnknownError(
AddonConfigurationError, APIUnknownSupervisorError
):
"""Raise when unknown error occurs trying to read/write addon configuration file."""
error_key = "addon_configuration_file_unknown_error"
message_template = (
"An unknown error occurred reading/writing configuration file for {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonBuildImageUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs during image build."""
error_key = "addon_build_image_unknown_error"
message_template = "An unknown error occurred during build of image for {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonInstallImageUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs during image install."""
error_key = "addon_install_image_unknown_error"
message_template = "An unknown error occurred during install of image for {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonRemoveImageUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while removing an image."""
error_key = "addon_remove_image_unknown_error"
message_template = "An unknown error occurred while removing image for {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonContainerStartUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while starting a container."""
error_key = "addon_container_start_unknown_error"
message_template = "An unknown error occurred while starting container for {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonContainerStopUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while stopping a container."""
error_key = "addon_container_stop_unknown_error"
message_template = "An unknown error occurred while stopping container for {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonContainerStatsUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while getting stats of a container."""
error_key = "addon_container_stats_unknown_error"
message_template = (
"An unknown error occurred while getting stats of container for {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonContainerWriteStdinUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while writing to stdin of a container."""
error_key = "addon_container_write_stdin_unknown_error"
message_template = (
"An unknown error occurred while writing to stdin of container for {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonContainerRunCommandUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while running command inside of a container."""
error_key = "addon_container_run_command_unknown_error"
message_template = "An unknown error occurred while running a command inside of container for {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonCreateBackupFileUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while making the backup file for an addon."""
error_key = "addon_create_backup_file_unknown_error"
message_template = (
"An unknown error occurred while creating the backup file for {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonCreateBackupMetadataFileUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while making the metadata file for an addon backup."""
error_key = "addon_create_backup_metadata_file_unknown_error"
message_template = "An unknown error occurred while creating the metadata file for backup of {addon}"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonBackupAppArmorProfileUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while backing up the AppArmor profile of an addon."""
error_key = "addon_backup_app_armor_profile_unknown_error"
message_template = (
"An unknown error occurred while backing up the AppArmor profile of {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonBackupExportImageUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while exporting image for an addon backup."""
error_key = "addon_backup_export_image_unknown_error"
message_template = (
"An unknown error occurred while exporting image to back up {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonExtractBackupFileUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when an unknown error occurs while extracting backup file for an addon."""
error_key = "addon_extract_backup_file_unknown_error"
message_template = (
"An unknown error occurred while extracting the backup file for {addon}"
)
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonRestoreBackupDataUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when unknown error occurs while restoring data/config for addon from backup."""
error_key = "addon_restore_backup_data_unknown_error"
message_template = "An unknown error occurred while restoring data and config for {addon} from backup"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
class AddonRestoreAppArmorProfileUnknownError(AddonsError, APIUnknownSupervisorError):
"""Raise when unknown error occurs while restoring AppArmor profile for addon from backup."""
error_key = "addon_restore_app_armor_profile_unknown_error"
message_template = "An unknown error occurred while restoring AppArmor profile for {addon} from backup"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(logger)
super().__init__(
None,
logger,
extra_fields={"slug": slug, "version": version},
)
class AddonsJobError(AddonsError, JobException):
@@ -816,59 +346,13 @@ class AuthError(HassioError):
"""Auth errors."""
# This one uses the check logs rider even though its not a 500 error because it
# is bad practice to return error specifics from a password reset API.
class AuthPasswordResetError(AuthError, APIError):
class AuthPasswordResetError(HassioError):
"""Auth error if password reset failed."""
error_key = "auth_password_reset_error"
message_template = (
f"Unable to reset password for '{{user}}'. {MESSAGE_CHECK_SUPERVISOR_LOGS}"
)
def __init__(self, logger: Callable[..., None] | None = None, *, user: str) -> None:
"""Initialize exception."""
self.extra_fields = {"user": user} | EXTRA_FIELDS_LOGS_COMMAND
super().__init__(None, logger)
class AuthListUsersError(AuthError, APIUnknownSupervisorError):
class AuthListUsersError(HassioError):
"""Auth error if listing users failed."""
error_key = "auth_list_users_error"
message_template = "Can't request listing users on Home Assistant"
class AuthListUsersNoneResponseError(AuthError, APIInternalServerError):
"""Auth error if listing users returned invalid None response."""
error_key = "auth_list_users_none_response_error"
message_template = "Home Assistant returned invalid response of `{none}` instead of a list of users. Check Home Assistant logs for details (check with `{logs_command}`)"
extra_fields = {"none": "None", "logs_command": "ha core logs"}
def __init__(self, logger: Callable[..., None] | None = None) -> None:
"""Initialize exception."""
super().__init__(None, logger)
class AuthInvalidNoneValueError(AuthError, APIUnauthorized):
"""Auth error if None provided as username or password."""
error_key = "auth_invalid_none_value_error"
message_template = "{none} as username or password is not supported"
extra_fields = {"none": "None"}
def __init__(self, logger: Callable[..., None] | None = None) -> None:
"""Initialize exception."""
super().__init__(None, logger)
class AuthHomeAssistantAPIValidationError(AuthError, APIUnknownSupervisorError):
"""Error encountered trying to validate auth details via Home Assistant API."""
error_key = "auth_home_assistant_api_validation_error"
message_template = "Unable to validate authentication details with Home Assistant"
# Host
@@ -901,6 +385,54 @@ class HostLogError(HostError):
"""Internal error with host log."""
# API
class APIError(HassioError, RuntimeError):
"""API errors."""
status = 400
def __init__(
self,
message: str | None = None,
logger: Callable[..., None] | None = None,
*,
job_id: str | None = None,
error: HassioError | None = None,
) -> None:
"""Raise & log, optionally with job."""
# Allow these to be set from another error here since APIErrors essentially wrap others to add a status
self.error_key = error.error_key if error else None
self.message_template = error.message_template if error else None
super().__init__(
message, logger, extra_fields=error.extra_fields if error else None
)
self.job_id = job_id
class APIForbidden(APIError):
"""API forbidden error."""
status = 403
class APINotFound(APIError):
"""API not found error."""
status = 404
class APIAddonNotInstalled(APIError):
"""Not installed addon requested at addons API."""
class APIDBMigrationInProgress(APIError):
"""Service is unavailable due to an offline DB migration is in progress."""
status = 503
# Service / Discovery
@@ -1045,6 +577,21 @@ class PwnedConnectivityError(PwnedError):
"""Connectivity errors while checking pwned passwords."""
# util/codenotary
class CodeNotaryError(HassioError):
"""Error general with CodeNotary."""
class CodeNotaryUntrusted(CodeNotaryError):
"""Error on untrusted content."""
class CodeNotaryBackendError(CodeNotaryError):
"""CodeNotary backend error happening."""
# util/whoami
@@ -1078,10 +625,6 @@ class DockerError(HassioError):
"""Docker API/Transport errors."""
class DockerBuildError(DockerError):
"""Docker error during build."""
class DockerAPIError(DockerError):
"""Docker API error."""
@@ -1178,20 +721,6 @@ class StoreNotFound(StoreError):
"""Raise if slug is not known."""
class StoreAddonNotFoundError(StoreError, APINotFound):
"""Raise if a requested addon is not in the store."""
error_key = "store_addon_not_found_error"
message_template = "Addon {addon} does not exist in the store"
def __init__(
self, logger: Callable[..., None] | None = None, *, addon: str
) -> None:
"""Initialize exception."""
self.extra_fields = {"addon": addon}
super().__init__(None, logger)
class StoreJobError(StoreError, JobException):
"""Raise on job error with git."""

View File

@@ -428,6 +428,13 @@ class HomeAssistantCore(JobGroup):
"""
return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate HomeAssistant docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats:
"""Return stats of Home Assistant."""
try:

View File

@@ -371,6 +371,12 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
_LOGGER.error,
) from err
if not resp:
raise HomeAssistantBackupError(
"Preparing backup of Home Assistant Core failed. No response from HA Core.",
_LOGGER.error,
)
if resp and not resp.get(ATTR_SUCCESS):
raise HomeAssistantBackupError(
f"Preparing backup of Home Assistant Core failed due to: {resp.get(ATTR_ERROR, {}).get(ATTR_MESSAGE, '')}. Check HA Core logs.",

View File

@@ -225,6 +225,10 @@ class HomeAssistantWebSocket(CoreSysAttributes):
# since it makes a new socket connection and we already have one.
if not connected and not await self.sys_homeassistant.api.check_api_state():
# No core access, don't try.
_LOGGER.debug(
"Home Assistant API is not accessible. Not sending WS message: %s",
message,
)
return False
if not self._client:

View File

@@ -98,9 +98,7 @@ class SupervisorJobError:
"""Representation of an error occurring during a supervisor job."""
type_: type[HassioError] = HassioError
message: str = (
"Unknown error, see Supervisor logs (check with 'ha supervisor logs')"
)
message: str = "Unknown error, see supervisor logs"
stage: str | None = None
def as_dict(self) -> dict[str, str | None]:
@@ -329,17 +327,6 @@ class JobManager(FileConfiguration, CoreSysAttributes):
if not curr_parent.child_job_syncs:
continue
# HACK: If parent trigger the same child job, we just skip this second
# sync. Maybe it would be better to have this reflected in the job stage
# and reset progress to 0 instead? There is no support for such stage
# information on Core update entities today though.
if curr_parent.done is True or curr_parent.progress >= 100:
_LOGGER.debug(
"Skipping parent job sync for done parent job %s",
curr_parent.name,
)
continue
# Break after first match at each parent as it doesn't make sense
# to match twice. But it could match multiple parents
for sync in curr_parent.child_job_syncs:

View File

@@ -76,6 +76,13 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
"""Return True if a task is in progress."""
return self.instance.in_progress
def check_trust(self) -> Awaitable[None]:
"""Calculate plugin docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
def logs(self) -> Awaitable[bytes]:
"""Get docker plugin logs.

View File

@@ -0,0 +1,59 @@
"""Helpers to check supervisor trust."""
import logging
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckSupervisorTrust(coresys)
class CheckSupervisorTrust(CheckBase):
"""CheckSystemTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
try:
await self.sys_supervisor.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED)
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
pass
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
try:
await self.sys_supervisor.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SUPERVISOR
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@@ -39,6 +39,7 @@ class UnsupportedReason(StrEnum):
APPARMOR = "apparmor"
CGROUP_VERSION = "cgroup_version"
CONNECTIVITY_CHECK = "connectivity_check"
CONTENT_TRUST = "content_trust"
DBUS = "dbus"
DNS_SERVER = "dns_server"
DOCKER_CONFIGURATION = "docker_configuration"
@@ -53,6 +54,7 @@ class UnsupportedReason(StrEnum):
PRIVILEGED = "privileged"
RESTART_POLICY = "restart_policy"
SOFTWARE = "software"
SOURCE_MODS = "source_mods"
SUPERVISOR_VERSION = "supervisor_version"
SYSTEMD = "systemd"
SYSTEMD_JOURNAL = "systemd_journal"
@@ -101,6 +103,7 @@ class IssueType(StrEnum):
PWNED = "pwned"
REBOOT_REQUIRED = "reboot_required"
SECURITY = "security"
TRUST = "trust"
UPDATE_FAILED = "update_failed"
UPDATE_ROLLBACK = "update_rollback"
@@ -112,6 +115,7 @@ class SuggestionType(StrEnum):
CLEAR_FULL_BACKUP = "clear_full_backup"
CREATE_FULL_BACKUP = "create_full_backup"
DISABLE_BOOT = "disable_boot"
EXECUTE_INTEGRITY = "execute_integrity"
EXECUTE_REBOOT = "execute_reboot"
EXECUTE_REBUILD = "execute_rebuild"
EXECUTE_RELOAD = "execute_reload"

View File

@@ -13,6 +13,7 @@ from .validate import get_valid_modules
_LOGGER: logging.Logger = logging.getLogger(__name__)
UNHEALTHY = [
UnsupportedReason.DOCKER_VERSION,
UnsupportedReason.LXC,
UnsupportedReason.PRIVILEGED,
]

View File

@@ -0,0 +1,34 @@
"""Evaluation class for Content Trust."""
from ...const import CoreState
from ...coresys import CoreSys
from ..const import UnsupportedReason
from .base import EvaluateBase
def setup(coresys: CoreSys) -> EvaluateBase:
"""Initialize evaluation-setup function."""
return EvaluateContentTrust(coresys)
class EvaluateContentTrust(EvaluateBase):
"""Evaluate system content trust level."""
@property
def reason(self) -> UnsupportedReason:
"""Return a UnsupportedReason enum."""
return UnsupportedReason.CONTENT_TRUST
@property
def on_failure(self) -> str:
"""Return a string that is printed when self.evaluate is True."""
return "System run with disabled trusted content security."
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]
async def evaluate(self) -> bool:
"""Run evaluation."""
return not self.sys_security.content_trust

View File

@@ -0,0 +1,72 @@
"""Evaluation class for Content Trust."""
import errno
import logging
from pathlib import Path
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ...utils.codenotary import calc_checksum_path_sourcecode
from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason
from .base import EvaluateBase
_SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor")
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> EvaluateBase:
"""Initialize evaluation-setup function."""
return EvaluateSourceMods(coresys)
class EvaluateSourceMods(EvaluateBase):
"""Evaluate supervisor source modifications."""
@property
def reason(self) -> UnsupportedReason:
"""Return a UnsupportedReason enum."""
return UnsupportedReason.SOURCE_MODS
@property
def on_failure(self) -> str:
"""Return a string that is printed when self.evaluate is True."""
return "System detect unauthorized source code modifications."
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.RUNNING]
async def evaluate(self) -> bool:
"""Run evaluation."""
if not self.sys_security.content_trust:
_LOGGER.warning("Disabled content-trust, skipping evaluation")
return False
# Calculate sume of the sourcecode
try:
checksum = await self.sys_run_in_executor(
calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE
)
except OSError as err:
if err.errno == errno.EBADMSG:
self.sys_resolution.add_unhealthy_reason(
UnhealthyReason.OSERROR_BAD_MESSAGE
)
self.sys_resolution.create_issue(
IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM
)
_LOGGER.error("Can't calculate checksum of source code: %s", err)
return False
# Validate checksum
try:
await self.sys_security.verify_own_content(checksum)
except CodeNotaryUntrusted:
return True
except CodeNotaryError:
pass
return False

View File

@@ -0,0 +1,67 @@
"""Helpers to check and fix issues with free space."""
from datetime import timedelta
import logging
from ...coresys import CoreSys
from ...exceptions import ResolutionFixupError, ResolutionFixupJobError
from ...jobs.const import JobCondition, JobThrottle
from ...jobs.decorator import Job
from ...security.const import ContentTrustResult
from ..const import ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemExecuteIntegrity(coresys)
class FixupSystemExecuteIntegrity(FixupBase):
"""Storage class for fixup."""
@Job(
name="fixup_system_execute_integrity_process",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=ResolutionFixupJobError,
throttle_period=timedelta(hours=8),
throttle=JobThrottle.THROTTLE,
)
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
result = await self.sys_security.integrity_check()
if ContentTrustResult.FAILED in (result.core, result.supervisor):
raise ResolutionFixupError()
for plugin in result.plugins:
if plugin != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
for addon in result.addons:
if addon != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
@property
def suggestion(self) -> SuggestionType:
"""Return a SuggestionType enum."""
return SuggestionType.EXECUTE_INTEGRITY
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.TRUST]
@property
def auto(self) -> bool:
"""Return if a fixup can be apply as auto fix."""
return True

View File

@@ -0,0 +1,24 @@
"""Security constants."""
from enum import StrEnum
import attr
class ContentTrustResult(StrEnum):
"""Content trust result enum."""
PASS = "pass"
ERROR = "error"
FAILED = "failed"
UNTESTED = "untested"
@attr.s
class IntegrityResult:
"""Result of a full integrity check."""
supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
plugins: dict[str, ContentTrustResult] = attr.ib(default={})
addons: dict[str, ContentTrustResult] = attr.ib(default={})

View File

@@ -4,12 +4,27 @@ from __future__ import annotations
import logging
from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED, FILE_HASSIO_SECURITY
from ..const import (
ATTR_CONTENT_TRUST,
ATTR_FORCE_SECURITY,
ATTR_PWNED,
FILE_HASSIO_SECURITY,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import PwnedError
from ..exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
PwnedError,
SecurityJobError,
)
from ..jobs.const import JobConcurrency
from ..jobs.decorator import Job, JobCondition
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.codenotary import cas_validate
from ..utils.common import FileConfiguration
from ..utils.pwned import check_pwned_password
from ..validate import SCHEMA_SECURITY_CONFIG
from .const import ContentTrustResult, IntegrityResult
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -22,6 +37,16 @@ class Security(FileConfiguration, CoreSysAttributes):
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
self.coresys = coresys
@property
def content_trust(self) -> bool:
"""Return if content trust is enabled/disabled."""
return self._data[ATTR_CONTENT_TRUST]
@content_trust.setter
def content_trust(self, value: bool) -> None:
"""Set content trust is enabled/disabled."""
self._data[ATTR_CONTENT_TRUST] = value
@property
def force(self) -> bool:
"""Return if force security is enabled/disabled."""
@@ -42,6 +67,30 @@ class Security(FileConfiguration, CoreSysAttributes):
"""Set pwned is enabled/disabled."""
self._data[ATTR_PWNED] = value
async def verify_content(self, signer: str, checksum: str) -> None:
"""Verify content on CAS."""
if not self.content_trust:
_LOGGER.warning("Disabled content-trust, skip validation")
return
try:
await cas_validate(signer, checksum)
except CodeNotaryUntrusted:
raise
except CodeNotaryError:
if self.force:
raise
self.sys_resolution.create_issue(
IssueType.TRUST,
ContextType.SYSTEM,
suggestions=[SuggestionType.EXECUTE_INTEGRITY],
)
return
async def verify_own_content(self, checksum: str) -> None:
"""Verify content from HA org."""
return await self.verify_content("notary@home-assistant.io", checksum)
async def verify_secret(self, pwned_hash: str) -> None:
"""Verify pwned state of a secret."""
if not self.pwned:
@@ -54,3 +103,73 @@ class Security(FileConfiguration, CoreSysAttributes):
if self.force:
raise
return
@Job(
name="security_manager_integrity_check",
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=SecurityJobError,
concurrency=JobConcurrency.REJECT,
)
async def integrity_check(self) -> IntegrityResult:
"""Run a full system integrity check of the platform.
We only allow to install trusted content.
This is a out of the band manual check.
"""
result: IntegrityResult = IntegrityResult()
if not self.content_trust:
_LOGGER.warning(
"Skipping integrity check, content_trust is globally disabled"
)
return result
# Supervisor
try:
await self.sys_supervisor.check_trust()
result.supervisor = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.supervisor = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
result.supervisor = ContentTrustResult.FAILED
# Core
try:
await self.sys_homeassistant.core.check_trust()
result.core = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.core = ContentTrustResult.ERROR
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
except CodeNotaryError:
result.core = ContentTrustResult.FAILED
# Plugins
for plugin in self.sys_plugins.all_plugins:
try:
await plugin.check_trust()
result.plugins[plugin.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.plugins[plugin.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
)
except CodeNotaryError:
result.plugins[plugin.slug] = ContentTrustResult.FAILED
# Add-ons
for addon in self.sys_addons.installed:
if not addon.signed:
result.addons[addon.slug] = ContentTrustResult.UNTESTED
continue
try:
await addon.check_trust()
result.addons[addon.slug] = ContentTrustResult.PASS
except CodeNotaryUntrusted:
result.addons[addon.slug] = ContentTrustResult.ERROR
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.ADDON, reference=addon.slug
)
except CodeNotaryError:
result.addons[addon.slug] = ContentTrustResult.FAILED
return result

View File

@@ -25,16 +25,19 @@ from .coresys import CoreSys, CoreSysAttributes
from .docker.stats import DockerStats
from .docker.supervisor import DockerSupervisor
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
DockerError,
HostAppArmorError,
SupervisorAppArmorError,
SupervisorError,
SupervisorJobError,
SupervisorStatsError,
SupervisorUpdateError,
)
from .jobs.const import JobCondition, JobThrottle
from .jobs.decorator import Job
from .resolution.const import ContextType, IssueType, UnhealthyReason
from .utils.codenotary import calc_checksum
from .utils.sentry import async_capture_exception
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -147,6 +150,20 @@ class Supervisor(CoreSysAttributes):
_LOGGER.error,
) from err
# Validate
try:
await self.sys_security.verify_own_content(calc_checksum(data))
except CodeNotaryUntrusted as err:
raise SupervisorAppArmorError(
"Content-Trust is broken for the AppArmor profile fetch!",
_LOGGER.critical,
) from err
except CodeNotaryError as err:
raise SupervisorAppArmorError(
f"CodeNotary error while processing AppArmor fetch: {err!s}",
_LOGGER.error,
) from err
# Load
temp_dir: TemporaryDirectory | None = None
@@ -256,12 +273,19 @@ class Supervisor(CoreSysAttributes):
"""
return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate Supervisor docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats:
"""Return stats of Supervisor."""
try:
return await self.instance.stats()
except DockerError as err:
raise SupervisorStatsError() from err
raise SupervisorError() from err
async def repair(self):
"""Repair local Supervisor data."""

View File

@@ -31,8 +31,14 @@ from .const import (
UpdateChannel,
)
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import UpdaterError, UpdaterJobError
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
UpdaterError,
UpdaterJobError,
)
from .jobs.decorator import Job, JobCondition
from .utils.codenotary import calc_checksum
from .utils.common import FileConfiguration
from .validate import SCHEMA_UPDATER_CONFIG
@@ -283,6 +289,19 @@ class Updater(FileConfiguration, CoreSysAttributes):
self.sys_bus.remove_listener(self._connectivity_listener)
self._connectivity_listener = None
# Validate
try:
await self.sys_security.verify_own_content(calc_checksum(data))
except CodeNotaryUntrusted as err:
raise UpdaterError(
"Content-Trust is broken for the version file fetch!", _LOGGER.critical
) from err
except CodeNotaryError as err:
raise UpdaterError(
f"CodeNotary error while processing version fetch: {err!s}",
_LOGGER.error,
) from err
# Parse data
try:
data = json.loads(data)

View File

@@ -0,0 +1,109 @@
"""Small wrapper for CodeNotary."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
from pathlib import Path
import shlex
from typing import Final
from dirhash import dirhash
from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted
from . import clean_env
_LOGGER: logging.Logger = logging.getLogger(__name__)
_CAS_CMD: str = (
"cas authenticate --signerID {signer} --silent --output json --hash {sum}"
)
_CACHE: set[tuple[str, str]] = set()
_ATTR_ERROR: Final = "error"
_ATTR_STATUS: Final = "status"
_FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue"
def calc_checksum(data: str | bytes) -> str:
"""Generate checksum for CodeNotary."""
if isinstance(data, str):
return hashlib.sha256(data.encode()).hexdigest()
return hashlib.sha256(data).hexdigest()
def calc_checksum_path_sourcecode(folder: Path) -> str:
"""Calculate checksum for a path source code.
Need catch OSError.
"""
return dirhash(folder.as_posix(), "sha256", match=["*.py"])
# pylint: disable=unreachable
async def cas_validate(
signer: str,
checksum: str,
) -> None:
"""Validate data against CodeNotary."""
return
if (checksum, signer) in _CACHE:
return
# Generate command for request
command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum))
# Request notary authorization
_LOGGER.debug("Send cas command: %s", command)
try:
proc = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=clean_env(),
)
async with asyncio.timeout(15):
data, error = await proc.communicate()
except TimeoutError:
raise CodeNotaryBackendError(
"Timeout while processing CodeNotary", _LOGGER.warning
) from None
except OSError as err:
raise CodeNotaryError(
f"CodeNotary fatal error: {err!s}", _LOGGER.critical
) from err
# Check if Notarized
if proc.returncode != 0 and not data:
if error:
try:
error = error.decode("utf-8")
except UnicodeDecodeError as err:
raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err
if "not notarized" in error:
raise CodeNotaryUntrusted()
else:
error = _FALLBACK_ERROR
raise CodeNotaryBackendError(error, _LOGGER.warning)
# Parse data
try:
data_json = json.loads(data)
_LOGGER.debug("CodeNotary response with: %s", data_json)
except (json.JSONDecodeError, UnicodeDecodeError) as err:
raise CodeNotaryError(
f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error
) from err
if _ATTR_ERROR in data_json:
raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning)
if data_json[_ATTR_STATUS] == 0:
_CACHE.add((checksum, signer))
else:
raise CodeNotaryUntrusted()

View File

@@ -12,6 +12,7 @@ from .const import (
ATTR_AUTO_UPDATE,
ATTR_CHANNEL,
ATTR_CLI,
ATTR_CONTENT_TRUST,
ATTR_COUNTRY,
ATTR_DEBUG,
ATTR_DEBUG_BLOCK,
@@ -228,6 +229,7 @@ SCHEMA_INGRESS_CONFIG = vol.Schema(
# pylint: disable=no-value-for-parameter
SCHEMA_SECURITY_CONFIG = vol.Schema(
{
vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(),
vol.Optional(ATTR_PWNED, default=True): vol.Boolean(),
vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(),
},

View File

@@ -938,7 +938,7 @@ async def test_addon_load_succeeds_with_docker_errors(
coresys.docker.images.get.side_effect = ImageNotFound("missing")
caplog.clear()
await install_addon_ssh.load()
assert "Cannot build addon 'local_ssh' because dockerfile is missing" in caplog.text
assert "Invalid build environment" in caplog.text
# Image build failure
coresys.docker.images.build.side_effect = DockerException()

View File

@@ -3,12 +3,10 @@
from unittest.mock import PropertyMock, patch
from awesomeversion import AwesomeVersion
import pytest
from supervisor.addons.addon import Addon
from supervisor.addons.build import AddonBuild
from supervisor.coresys import CoreSys
from supervisor.exceptions import AddonBuildDockerfileMissingError
from tests.common import is_in_list
@@ -104,11 +102,11 @@ async def test_build_valid(coresys: CoreSys, install_addon_ssh: Addon):
type(coresys.arch), "default", new=PropertyMock(return_value="aarch64")
),
):
assert (await build.is_valid()) is None
assert await build.is_valid()
async def test_build_invalid(coresys: CoreSys, install_addon_ssh: Addon):
"""Test build not supported because Dockerfile missing for specified architecture."""
"""Test platform set in docker args."""
build = await AddonBuild(coresys, install_addon_ssh).load_config()
with (
patch.object(
@@ -117,6 +115,5 @@ async def test_build_invalid(coresys: CoreSys, install_addon_ssh: Addon):
patch.object(
type(coresys.arch), "default", new=PropertyMock(return_value="amd64")
),
pytest.raises(AddonBuildDockerfileMissingError),
):
await build.is_valid()
assert not await build.is_valid()

View File

@@ -482,11 +482,6 @@ async def test_addon_options_boot_mode_manual_only_invalid(
body["message"]
== "Addon local_example boot option is set to manual_only so it cannot be changed"
)
assert body["error_key"] == "addon_boot_config_cannot_change_error"
assert body["extra_fields"] == {
"addon": "local_example",
"boot_config": "manual_only",
}
async def get_message(resp: ClientResponse, json_expected: bool) -> str:
@@ -555,94 +550,3 @@ async def test_addon_not_installed(
resp = await api_client.request(method, url)
assert resp.status == 400
assert await get_message(resp, json_expected) == "Addon is not installed"
async def test_addon_set_options(api_client: TestClient, install_addon_example: Addon):
"""Test setting options for an addon."""
resp = await api_client.post(
"/addons/local_example/options", json={"options": {"message": "test"}}
)
assert resp.status == 200
assert install_addon_example.options == {"message": "test"}
async def test_addon_set_options_error(
api_client: TestClient, install_addon_example: Addon
):
"""Test setting options for an addon."""
resp = await api_client.post(
"/addons/local_example/options", json={"options": {"message": True}}
)
assert resp.status == 400
body = await resp.json()
assert (
body["message"]
== "Add-on local_example has invalid options: not a valid value. Got {'message': True}"
)
assert body["error_key"] == "addon_configuration_invalid_error"
assert body["extra_fields"] == {
"addon": "local_example",
"validation_error": "not a valid value. Got {'message': True}",
}
async def test_addon_start_options_error(
api_client: TestClient,
install_addon_example: Addon,
caplog: pytest.LogCaptureFixture,
):
"""Test error writing options when trying to start addon."""
install_addon_example.options = {"message": "hello"}
# Simulate OS error trying to write the file
with patch("supervisor.utils.json.atomic_write", side_effect=OSError("fail")):
resp = await api_client.post("/addons/local_example/start")
assert resp.status == 500
body = await resp.json()
assert (
body["message"]
== "An unknown error occurred reading/writing configuration file for local_example. Check supervisor logs for details (check with 'ha supervisor logs')"
)
assert body["error_key"] == "addon_configuration_file_unknown_error"
assert body["extra_fields"] == {
"addon": "local_example",
"logs_command": "ha supervisor logs",
}
assert "Add-on local_example can't write options" in caplog.text
# Simulate an update with a breaking change for options schema creating failure on start
caplog.clear()
install_addon_example.data["schema"] = {"message": "bool"}
resp = await api_client.post("/addons/local_example/start")
assert resp.status == 400
body = await resp.json()
assert (
body["message"]
== "Add-on local_example has invalid options: expected boolean. Got {'message': 'hello'}"
)
assert body["error_key"] == "addon_configuration_invalid_error"
assert body["extra_fields"] == {
"addon": "local_example",
"validation_error": "expected boolean. Got {'message': 'hello'}",
}
assert (
"Add-on local_example has invalid options: expected boolean. Got {'message': 'hello'}"
in caplog.text
)
@pytest.mark.parametrize(("method", "action"), [("get", "stats"), ("post", "stdin")])
async def test_addon_not_running_error(
api_client: TestClient, install_addon_example: Addon, method: str, action: str
):
"""Test addon not running error for endpoints that require that."""
with patch.object(
Addon, "with_stdin", return_value=PropertyMock(return_value=True)
):
resp = await api_client.request(method, f"/addons/local_example/{action}")
assert resp.status == 400
body = await resp.json()
assert body["message"] == "Add-on local_example is not running"
assert body["error_key"] == "addon_not_running_error"
assert body["extra_fields"] == {"addon": "local_example"}

View File

@@ -6,11 +6,9 @@ from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp.hdrs import WWW_AUTHENTICATE
from aiohttp.test_utils import TestClient
import pytest
from securetar import Any
from supervisor.addons.addon import Addon
from supervisor.coresys import CoreSys
from supervisor.exceptions import HomeAssistantAPIError, HomeAssistantWSError
from tests.common import MockResponse
from tests.const import TEST_ADDON_SLUG
@@ -102,52 +100,6 @@ async def test_password_reset(
assert "Successful password reset for 'john'" in caplog.text
@pytest.mark.parametrize(
("post_mock", "expected_log"),
[
(
MagicMock(return_value=MockResponse(status=400)),
"The user 'john' is not registered",
),
(
MagicMock(side_effect=HomeAssistantAPIError("fail")),
"Can't request password reset on Home Assistant: fail",
),
],
)
async def test_failed_password_reset(
api_client: TestClient,
coresys: CoreSys,
caplog: pytest.LogCaptureFixture,
websession: MagicMock,
post_mock: MagicMock,
expected_log: str,
):
"""Test failed password reset."""
coresys.homeassistant.api.access_token = "abc123"
# pylint: disable-next=protected-access
coresys.homeassistant.api._access_token_expires = datetime.now(tz=UTC) + timedelta(
days=1
)
websession.post = post_mock
resp = await api_client.post(
"/auth/reset", json={"username": "john", "password": "doe"}
)
assert resp.status == 400
body = await resp.json()
assert (
body["message"]
== "Unable to reset password for 'john'. Check supervisor logs for details (check with 'ha supervisor logs')"
)
assert body["error_key"] == "auth_password_reset_error"
assert body["extra_fields"] == {
"user": "john",
"logs_command": "ha supervisor logs",
}
assert expected_log in caplog.text
async def test_list_users(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
):
@@ -168,48 +120,6 @@ async def test_list_users(
]
@pytest.mark.parametrize(
("send_command_mock", "error_response", "expected_log"),
[
(
AsyncMock(return_value=None),
{
"result": "error",
"message": "Home Assistant returned invalid response of `None` instead of a list of users. Check Home Assistant logs for details (check with `ha core logs`)",
"error_key": "auth_list_users_none_response_error",
"extra_fields": {"none": "None", "logs_command": "ha core logs"},
},
"Home Assistant returned invalid response of `None` instead of a list of users. Check Home Assistant logs for details (check with `ha core logs`)",
),
(
AsyncMock(side_effect=HomeAssistantWSError("fail")),
{
"result": "error",
"message": "Can't request listing users on Home Assistant. Check supervisor logs for details (check with 'ha supervisor logs')",
"error_key": "auth_list_users_error",
"extra_fields": {"logs_command": "ha supervisor logs"},
},
"Can't request listing users on Home Assistant: fail",
),
],
)
async def test_list_users_failure(
api_client: TestClient,
ha_ws_client: AsyncMock,
caplog: pytest.LogCaptureFixture,
send_command_mock: AsyncMock,
error_response: dict[str, Any],
expected_log: str,
):
"""Test failure listing users via API."""
ha_ws_client.async_send_command = send_command_mock
resp = await api_client.get("/auth/list")
assert resp.status == 500
result = await resp.json()
assert result == error_response
assert expected_log in caplog.text
@pytest.mark.parametrize(
("field", "api_client"),
[("username", TEST_ADDON_SLUG), ("user", TEST_ADDON_SLUG)],

View File

@@ -1,28 +1,12 @@
"""Test ingress API."""
from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
import aiohttp
from aiohttp import hdrs, web
from aiohttp.test_utils import TestClient, TestServer
import pytest
from aiohttp.test_utils import TestClient
from supervisor.addons.addon import Addon
from supervisor.coresys import CoreSys
@pytest.fixture(name="real_websession")
async def fixture_real_websession(
coresys: CoreSys,
) -> AsyncGenerator[aiohttp.ClientSession]:
"""Fixture for real aiohttp ClientSession for ingress proxy tests."""
session = aiohttp.ClientSession()
coresys._websession = session # pylint: disable=W0212
yield session
await session.close()
async def test_validate_session(api_client: TestClient, coresys: CoreSys):
"""Test validating ingress session."""
with patch("aiohttp.web_request.BaseRequest.__getitem__", return_value=None):
@@ -102,126 +86,3 @@ async def test_validate_session_with_user_id(
assert (
coresys.ingress.get_session_data(session).user.display_name == "Some Name"
)
async def test_ingress_proxy_no_content_type_for_empty_body_responses(
api_client: TestClient, coresys: CoreSys, real_websession: aiohttp.ClientSession
):
"""Test that empty body responses don't get Content-Type header."""
# Create a mock add-on backend server that returns various status codes
async def mock_addon_handler(request: web.Request) -> web.Response:
"""Mock add-on handler that returns different status codes based on path."""
path = request.path
if path == "/204":
# 204 No Content - should not have Content-Type
return web.Response(status=204)
elif path == "/304":
# 304 Not Modified - should not have Content-Type
return web.Response(status=304)
elif path == "/100":
# 100 Continue - should not have Content-Type
return web.Response(status=100)
elif path == "/head":
# HEAD request - should have Content-Type (same as GET would)
return web.Response(body=b"test", content_type="text/html")
elif path == "/200":
# 200 OK with body - should have Content-Type
return web.Response(body=b"test content", content_type="text/plain")
elif path == "/200-no-content-type":
# 200 OK without explicit Content-Type - should get default
return web.Response(body=b"test content")
elif path == "/200-json":
# 200 OK with JSON - should preserve Content-Type
return web.Response(
body=b'{"key": "value"}', content_type="application/json"
)
else:
return web.Response(body=b"default", content_type="text/html")
# Create test server for mock add-on
app = web.Application()
app.router.add_route("*", "/{tail:.*}", mock_addon_handler)
addon_server = TestServer(app)
await addon_server.start_server()
try:
# Create ingress session
resp = await api_client.post("/ingress/session")
result = await resp.json()
session = result["data"]["session"]
# Create a mock add-on
mock_addon = MagicMock(spec=Addon)
mock_addon.slug = "test_addon"
mock_addon.ip_address = addon_server.host
mock_addon.ingress_port = addon_server.port
mock_addon.ingress_stream = False
# Generate an ingress token and register the add-on
ingress_token = coresys.ingress.create_session()
with patch.object(coresys.ingress, "get", return_value=mock_addon):
# Test 204 No Content - should NOT have Content-Type
resp = await api_client.get(
f"/ingress/{ingress_token}/204",
cookies={"ingress_session": session},
)
assert resp.status == 204
assert hdrs.CONTENT_TYPE not in resp.headers
# Test 304 Not Modified - should NOT have Content-Type
resp = await api_client.get(
f"/ingress/{ingress_token}/304",
cookies={"ingress_session": session},
)
assert resp.status == 304
assert hdrs.CONTENT_TYPE not in resp.headers
# Test HEAD request - SHOULD have Content-Type (same as GET)
# per RFC 9110: HEAD should return same headers as GET
resp = await api_client.head(
f"/ingress/{ingress_token}/head",
cookies={"ingress_session": session},
)
assert resp.status == 200
assert hdrs.CONTENT_TYPE in resp.headers
assert "text/html" in resp.headers[hdrs.CONTENT_TYPE]
# Body should be empty for HEAD
body = await resp.read()
assert body == b""
# Test 200 OK with body - SHOULD have Content-Type
resp = await api_client.get(
f"/ingress/{ingress_token}/200",
cookies={"ingress_session": session},
)
assert resp.status == 200
assert hdrs.CONTENT_TYPE in resp.headers
assert resp.headers[hdrs.CONTENT_TYPE] == "text/plain"
body = await resp.read()
assert body == b"test content"
# Test 200 OK without explicit Content-Type - SHOULD get default
resp = await api_client.get(
f"/ingress/{ingress_token}/200-no-content-type",
cookies={"ingress_session": session},
)
assert resp.status == 200
assert hdrs.CONTENT_TYPE in resp.headers
# Should get application/octet-stream as default from aiohttp ClientResponse
assert "application/octet-stream" in resp.headers[hdrs.CONTENT_TYPE]
# Test 200 OK with JSON - SHOULD preserve Content-Type
resp = await api_client.get(
f"/ingress/{ingress_token}/200-json",
cookies={"ingress_session": session},
)
assert resp.status == 200
assert hdrs.CONTENT_TYPE in resp.headers
assert "application/json" in resp.headers[hdrs.CONTENT_TYPE]
body = await resp.read()
assert body == b'{"key": "value"}'
finally:
await addon_server.close()

View File

@@ -17,6 +17,16 @@ async def test_api_security_options_force_security(api_client, coresys: CoreSys)
assert coresys.security.force
@pytest.mark.asyncio
async def test_api_security_options_content_trust(api_client, coresys: CoreSys):
"""Test security options content trust."""
assert coresys.security.content_trust
await api_client.post("/security/options", json={"content_trust": False})
assert not coresys.security.content_trust
@pytest.mark.asyncio
async def test_api_security_options_pwned(api_client, coresys: CoreSys):
"""Test security options pwned."""
@@ -31,8 +41,11 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys):
async def test_api_integrity_check(
api_client, coresys: CoreSys, supervisor_internet: AsyncMock
):
"""Test security integrity check - now deprecated."""
resp = await api_client.post("/security/integrity")
"""Test security integrity check."""
coresys.security.content_trust = False
# CodeNotary integrity check has been removed, should return 410 Gone
assert resp.status == 410
resp = await api_client.post("/security/integrity")
result = await resp.json()
assert result["data"]["core"] == "untested"
assert result["data"]["supervisor"] == "untested"

View File

@@ -323,10 +323,7 @@ async def test_store_addon_not_found(
"""Test store addon not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
assert (
await get_message(resp, json_expected)
== "Addon bad does not exist in the store"
)
assert await get_message(resp, json_expected) == "Addon bad does not exist"
@pytest.mark.parametrize(

View File

@@ -7,7 +7,6 @@ from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
from blockbuster import BlockingError
from docker.errors import DockerException
import pytest
from supervisor.const import CoreState
@@ -418,37 +417,3 @@ async def test_api_progress_updates_supervisor_update(
"done": True,
},
]
async def test_api_supervisor_stats(api_client: TestClient, coresys: CoreSys):
"""Test supervisor stats."""
coresys.docker.containers.get.return_value.status = "running"
coresys.docker.containers.get.return_value.stats.return_value = load_json_fixture(
"container_stats.json"
)
resp = await api_client.get("/supervisor/stats")
assert resp.status == 200
result = await resp.json()
assert result["data"]["cpu_percent"] == 90.0
assert result["data"]["memory_usage"] == 59700000
assert result["data"]["memory_limit"] == 4000000000
assert result["data"]["memory_percent"] == 1.49
async def test_supervisor_api_stats_failure(
api_client: TestClient, coresys: CoreSys, caplog: pytest.LogCaptureFixture
):
"""Test supervisor stats failure."""
coresys.docker.containers.get.side_effect = DockerException("fail")
resp = await api_client.get("/supervisor/stats")
assert resp.status == 500
body = await resp.json()
assert (
body["message"]
== "Can't get stats for Supervisor container. Check supervisor logs for details (check with 'ha supervisor logs')"
)
assert body["error_key"] == "supervisor_stats_error"
assert body["extra_fields"] == {"logs_command": "ha supervisor logs"}
assert "Could not inspect container 'hassio_supervisor': fail" in caplog.text

View File

@@ -31,6 +31,15 @@ from supervisor.jobs import JobSchedulerOptions, SupervisorJob
from tests.common import load_json_fixture
@pytest.fixture(autouse=True)
def mock_verify_content(coresys: CoreSys):
"""Mock verify_content utility during tests."""
with patch.object(
coresys.security, "verify_content", return_value=None
) as verify_content:
yield verify_content
@pytest.mark.parametrize(
"cpu_arch, platform",
[
@@ -569,109 +578,3 @@ async def test_install_progress_handles_download_restart(
await event.wait()
capture_exception.assert_not_called()
async def test_install_progress_handles_layers_skipping_download(
coresys: CoreSys,
test_docker_interface: DockerInterface,
capture_exception: Mock,
):
"""Test install handles small layers that skip downloading phase and go directly to download complete.
Reproduces the real-world scenario from Supervisor issue #6286:
- Small layer (02a6e69d8d00) completes Download complete at 10:14:08 without ever Downloading
- Normal layer (3f4a84073184) starts Downloading at 10:14:09 with progress updates
"""
coresys.core.set_state(CoreState.RUNNING)
# Reproduce EXACT sequence from SupervisorNoUpdateProgressLogs.txt:
# Small layer (02a6e69d8d00) completes BEFORE normal layer (3f4a84073184) starts downloading
coresys.docker.docker.api.pull.return_value = [
{"status": "Pulling from test/image", "id": "latest"},
# Small layer that skips downloading (02a6e69d8d00 in logs, 96 bytes)
{"status": "Pulling fs layer", "progressDetail": {}, "id": "02a6e69d8d00"},
{"status": "Pulling fs layer", "progressDetail": {}, "id": "3f4a84073184"},
{"status": "Waiting", "progressDetail": {}, "id": "02a6e69d8d00"},
{"status": "Waiting", "progressDetail": {}, "id": "3f4a84073184"},
# Goes straight to Download complete (10:14:08 in logs) - THIS IS THE KEY MOMENT
{"status": "Download complete", "progressDetail": {}, "id": "02a6e69d8d00"},
# Normal layer that downloads (3f4a84073184 in logs, 25MB)
# Downloading starts (10:14:09 in logs) - progress updates should happen NOW!
{
"status": "Downloading",
"progressDetail": {"current": 260937, "total": 25371463},
"progress": "[> ] 260.9kB/25.37MB",
"id": "3f4a84073184",
},
{
"status": "Downloading",
"progressDetail": {"current": 5505024, "total": 25371463},
"progress": "[==========> ] 5.505MB/25.37MB",
"id": "3f4a84073184",
},
{
"status": "Downloading",
"progressDetail": {"current": 11272192, "total": 25371463},
"progress": "[======================> ] 11.27MB/25.37MB",
"id": "3f4a84073184",
},
{"status": "Download complete", "progressDetail": {}, "id": "3f4a84073184"},
{
"status": "Extracting",
"progressDetail": {"current": 25371463, "total": 25371463},
"progress": "[==================================================>] 25.37MB/25.37MB",
"id": "3f4a84073184",
},
{"status": "Pull complete", "progressDetail": {}, "id": "3f4a84073184"},
# Small layer finally extracts (10:14:58 in logs)
{
"status": "Extracting",
"progressDetail": {"current": 96, "total": 96},
"progress": "[==================================================>] 96B/96B",
"id": "02a6e69d8d00",
},
{"status": "Pull complete", "progressDetail": {}, "id": "02a6e69d8d00"},
{"status": "Digest: sha256:test"},
{"status": "Status: Downloaded newer image for test/image:latest"},
]
# Capture immutable snapshots of install job progress using job.as_dict()
# This solves the mutable object problem - we snapshot state at call time
install_job_snapshots = []
original_on_job_change = coresys.jobs._on_job_change # pylint: disable=W0212
def capture_and_forward(job_obj, attribute, value):
# Capture immutable snapshot if this is the install job with progress
if job_obj.name == "docker_interface_install" and job_obj.progress > 0:
install_job_snapshots.append(job_obj.as_dict())
# Forward to original to maintain functionality
return original_on_job_change(job_obj, attribute, value)
with patch.object(coresys.jobs, "_on_job_change", side_effect=capture_and_forward):
event = asyncio.Event()
job, install_task = coresys.jobs.schedule_job(
test_docker_interface.install,
JobSchedulerOptions(),
AwesomeVersion("1.2.3"),
"test",
)
async def listen_for_job_end(reference: SupervisorJob):
if reference.uuid != job.uuid:
return
event.set()
coresys.bus.register_event(BusEvent.SUPERVISOR_JOB_END, listen_for_job_end)
await install_task
await event.wait()
# First update from layer download should have rather low progress ((260937/25445459) / 2 ~ 0.5%)
assert install_job_snapshots[0]["progress"] < 1
# Total 8 events should lead to a progress update on the install job
assert len(install_job_snapshots) == 8
# Job should complete successfully
assert job.done is True
assert job.progress == 100
capture_exception.assert_not_called()

View File

@@ -7,8 +7,8 @@ import pytest
from supervisor.coresys import CoreSys
from supervisor.dbus.const import DeviceType
from supervisor.host.configuration import Interface, VlanConfig, WifiConfig
from supervisor.host.const import AuthMethod, InterfaceType, WifiMode
from supervisor.host.configuration import Interface, VlanConfig
from supervisor.host.const import InterfaceType
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.network_connection_settings import (
@@ -291,237 +291,3 @@ async def test_equals_dbus_interface_eth0_10_real(
# Test should pass with matching VLAN config
assert test_vlan_interface.equals_dbus_interface(network_interface) is True
def test_map_nm_wifi_non_wireless_interface():
"""Test _map_nm_wifi returns None for non-wireless interface."""
# Mock non-wireless interface
mock_interface = Mock()
mock_interface.type = DeviceType.ETHERNET
mock_interface.settings = Mock()
result = Interface._map_nm_wifi(mock_interface)
assert result is None
def test_map_nm_wifi_no_settings():
"""Test _map_nm_wifi returns None when interface has no settings."""
# Mock wireless interface without settings
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = None
result = Interface._map_nm_wifi(mock_interface)
assert result is None
def test_map_nm_wifi_open_authentication():
"""Test _map_nm_wifi with open authentication (no security)."""
# Mock wireless interface with open authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert isinstance(result, WifiConfig)
assert result.mode == WifiMode.INFRASTRUCTURE
assert result.ssid == "TestSSID"
assert result.auth == AuthMethod.OPEN
assert result.psk is None
assert result.signal is None
def test_map_nm_wifi_wep_authentication():
"""Test _map_nm_wifi with WEP authentication."""
# Mock wireless interface with WEP authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = Mock()
mock_interface.settings.wireless_security.key_mgmt = "none"
mock_interface.settings.wireless_security.psk = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "WEPNetwork"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert isinstance(result, WifiConfig)
assert result.auth == AuthMethod.WEP
assert result.ssid == "WEPNetwork"
assert result.psk is None
def test_map_nm_wifi_wpa_psk_authentication():
"""Test _map_nm_wifi with WPA-PSK authentication."""
# Mock wireless interface with WPA-PSK authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = Mock()
mock_interface.settings.wireless_security.key_mgmt = "wpa-psk"
mock_interface.settings.wireless_security.psk = "SecretPassword123"
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "SecureNetwork"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert isinstance(result, WifiConfig)
assert result.auth == AuthMethod.WPA_PSK
assert result.ssid == "SecureNetwork"
assert result.psk == "SecretPassword123"
def test_map_nm_wifi_unsupported_authentication():
"""Test _map_nm_wifi returns None for unsupported authentication method."""
# Mock wireless interface with unsupported authentication
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = Mock()
mock_interface.settings.wireless_security.key_mgmt = "wpa-eap" # Unsupported
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "EnterpriseNetwork"
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is None
def test_map_nm_wifi_different_modes():
"""Test _map_nm_wifi with different wifi modes."""
modes_to_test = [
("infrastructure", WifiMode.INFRASTRUCTURE),
("mesh", WifiMode.MESH),
("adhoc", WifiMode.ADHOC),
("ap", WifiMode.AP),
]
for mode_value, expected_mode in modes_to_test:
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = mode_value
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.mode == expected_mode
def test_map_nm_wifi_with_signal():
"""Test _map_nm_wifi with wireless signal strength."""
# Mock wireless interface with active connection and signal
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = Mock()
mock_interface.wireless.active = Mock()
mock_interface.wireless.active.strength = 75
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.signal == 75
def test_map_nm_wifi_without_signal():
"""Test _map_nm_wifi without wireless signal (no active connection)."""
# Mock wireless interface without active connection
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.signal is None
def test_map_nm_wifi_wireless_no_active_ap():
"""Test _map_nm_wifi with wireless object but no active access point."""
# Mock wireless interface with wireless object but no active AP
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = "infrastructure"
mock_interface.wireless = Mock()
mock_interface.wireless.active = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.signal is None
def test_map_nm_wifi_no_wireless_settings():
"""Test _map_nm_wifi when wireless settings are missing."""
# Mock wireless interface without wireless settings
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = None
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.ssid == ""
assert result.mode == WifiMode.INFRASTRUCTURE # Default mode
def test_map_nm_wifi_no_wireless_mode():
"""Test _map_nm_wifi when wireless mode is not specified."""
# Mock wireless interface without mode specified
mock_interface = Mock()
mock_interface.type = DeviceType.WIRELESS
mock_interface.settings = Mock()
mock_interface.settings.wireless_security = None
mock_interface.settings.wireless = Mock()
mock_interface.settings.wireless.ssid = "TestSSID"
mock_interface.settings.wireless.mode = None
mock_interface.wireless = None
mock_interface.interface_name = "wlan0"
result = Interface._map_nm_wifi(mock_interface)
assert result is not None
assert result.mode == WifiMode.INFRASTRUCTURE # Default mode

View File

@@ -198,7 +198,7 @@ async def test_notify_on_change(coresys: CoreSys, ha_ws_client: AsyncMock):
"errors": [
{
"type": "HassioError",
"message": "Unknown error, see Supervisor logs (check with 'ha supervisor logs')",
"message": "Unknown error, see supervisor logs",
"stage": "test",
}
],
@@ -226,7 +226,7 @@ async def test_notify_on_change(coresys: CoreSys, ha_ws_client: AsyncMock):
"errors": [
{
"type": "HassioError",
"message": "Unknown error, see Supervisor logs (check with 'ha supervisor logs')",
"message": "Unknown error, see supervisor logs",
"stage": "test",
}
],

View File

@@ -181,6 +181,7 @@ async def test_reload_updater_triggers_supervisor_update(
"""Test an updater reload triggers a supervisor update if there is one."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False
with (
patch.object(

View File

@@ -17,6 +17,7 @@ from supervisor.exceptions import (
AudioJobError,
CliError,
CliJobError,
CodeNotaryUntrusted,
CoreDNSError,
CoreDNSJobError,
DockerError,
@@ -336,12 +337,14 @@ async def test_repair_failed(
patch.object(
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
),
patch.object(DockerInterface, "install", side_effect=DockerError),
patch(
"supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted
),
):
await plugin.repair()
capture_exception.assert_called_once()
assert check_exception_chain(capture_exception.call_args[0][0], DockerError)
assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted)
@pytest.mark.parametrize(

View File

@@ -51,6 +51,7 @@ async def test_if_check_make_issue(coresys: CoreSys):
"""Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
await coresys.resolution.check.check_system()
@@ -62,6 +63,7 @@ async def test_if_check_cleanup_issue(coresys: CoreSys):
"""Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
await coresys.resolution.check.check_system()

View File

@@ -0,0 +1,96 @@
"""Test Check Supervisor trust."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
supervisor_trust = CheckSupervisorTrust(coresys)
assert supervisor_trust.slug == "supervisor_trust"
assert supervisor_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
coresys.supervisor.check_trust = AsyncMock(return_value=None)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await supervisor_trust.approve_check()
coresys.supervisor.check_trust = AsyncMock(return_value=None)
assert not await supervisor_trust.approve_check()
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
supervisor_trust = CheckSupervisorTrust(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert not coresys.security.verify_own_content.called
assert (
"Skipping supervisor_trust, content_trust is globally disabled" in caplog.text
)
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
supervisor_trust = CheckSupervisorTrust(coresys)
should_run = supervisor_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check",
return_value=None,
) as check:
for state in should_run:
await coresys.core.set_state(state)
await supervisor_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await supervisor_trust()
check.assert_not_called()
check.reset_mock()

View File

@@ -0,0 +1,46 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
from unittest.mock import patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
job_conditions = EvaluateContentTrust(coresys)
await coresys.core.set_state(CoreState.SETUP)
await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported
coresys.security.content_trust = False
await job_conditions()
assert job_conditions.reason in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys):
"""Test that the evaluation ran as expected."""
job_conditions = EvaluateContentTrust(coresys)
should_run = job_conditions.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.evaluations.content_trust.EvaluateContentTrust.evaluate",
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
await job_conditions()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await job_conditions()
evaluate.assert_not_called()
evaluate.reset_mock()

View File

@@ -0,0 +1,89 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
import errno
import os
from pathlib import Path
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.const import ContextType, IssueType
from supervisor.resolution.data import Issue
from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods
async def test_evaluation(coresys: CoreSys):
"""Test evaluation."""
with patch(
"supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE",
Path(f"{os.getcwd()}/supervisor"),
):
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await sourcemods()
assert sourcemods.reason in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError)
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock()
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
async def test_did_run(coresys: CoreSys):
"""Test that the evaluation ran as expected."""
sourcemods = EvaluateSourceMods(coresys)
should_run = sourcemods.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.evaluate",
return_value=None,
) as evaluate:
for state in should_run:
await coresys.core.set_state(state)
await sourcemods()
evaluate.assert_called_once()
evaluate.reset_mock()
for state in should_not_run:
await coresys.core.set_state(state)
await sourcemods()
evaluate.assert_not_called()
evaluate.reset_mock()
async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation."""
sourcemods = EvaluateSourceMods(coresys)
await coresys.core.set_state(CoreState.RUNNING)
corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM)
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs not in coresys.resolution.issues
with patch(
"supervisor.utils.codenotary.dirhash",
side_effect=(err := OSError()),
):
err.errno = errno.EBUSY
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs in coresys.resolution.issues
assert coresys.core.healthy is True
coresys.resolution.dismiss_issue(corrupt_fs)
err.errno = errno.EBADMSG
await sourcemods()
assert sourcemods.reason not in coresys.resolution.unsupported
assert corrupt_fs in coresys.resolution.issues
assert coresys.core.healthy is False

View File

@@ -0,0 +1,69 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
from datetime import timedelta
from unittest.mock import AsyncMock
import time_machine
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from supervisor.resolution.fixups.system_execute_integrity import (
FixupSystemExecuteIntegrity,
)
from supervisor.security.const import ContentTrustResult, IntegrityResult
from supervisor.utils.dt import utcnow
async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.add_suggestion(
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
)
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.PASS,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.add_suggestion(
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
)
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.FAILED,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
with time_machine.travel(utcnow() + timedelta(hours=24)):
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 1
assert len(coresys.resolution.issues) == 1

View File

@@ -1,15 +1,21 @@
"""Test evaluations."""
from unittest.mock import Mock
from unittest.mock import Mock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.utils import check_exception_chain
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while evaluating system."""
await coresys.core.set_state(CoreState.RUNNING)
await coresys.resolution.evaluate.evaluate_system()
with patch(
"supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode",
side_effect=RuntimeError,
):
await coresys.resolution.evaluate.evaluate_system()
capture_exception.assert_not_called()
capture_exception.assert_called_once()
assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError)

View File

@@ -0,0 +1,127 @@
"""Testing handling with Security."""
from unittest.mock import AsyncMock, patch
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.security.const import ContentTrustResult
async def test_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
with patch(
"supervisor.security.module.cas_validate", AsyncMock()
) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with(
"notary@home-assistant.io", "ffffffffffffff"
)
async def test_disabled_content_trust(coresys: CoreSys):
"""Test Content-Trust."""
coresys.security.content_trust = False
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert not cas_validate.called
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
await coresys.security.verify_own_content("ffffffffffffff")
assert not cas_validate.called
async def test_force_content_trust(coresys: CoreSys):
"""Force Content-Trust tests."""
with patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate:
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
assert cas_validate.called
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
coresys.security.force = True
with (
patch(
"supervisor.security.module.cas_validate",
AsyncMock(side_effect=CodeNotaryError),
) as cas_validate,
pytest.raises(CodeNotaryError),
):
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
async def test_integrity_check_disabled(coresys: CoreSys):
"""Test integrity check with disabled content trust."""
coresys.security.content_trust = False
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.UNTESTED
assert result.supervisor == ContentTrustResult.UNTESTED
async def test_integrity_check(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
install_addon_ssh.check_trust = AsyncMock()
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS
async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust issues."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.ERROR
assert result.supervisor == ContentTrustResult.ERROR
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR
async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust failed."""
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError)
install_addon_ssh.data["codenotary"] = "test@example.com"
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.FAILED
assert result.supervisor == ContentTrustResult.FAILED
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED
async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh):
"""Test integrity check with content trust but no signed add-ons."""
coresys.homeassistant.core.check_trust = AsyncMock()
coresys.supervisor.check_trust = AsyncMock()
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
assert result.core == ContentTrustResult.PASS
assert result.supervisor == ContentTrustResult.PASS
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED

View File

@@ -86,9 +86,10 @@ async def test_os_update_path(
"""Test OS upgrade path across major versions."""
coresys.os._board = "rpi4" # pylint: disable=protected-access
coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access
await coresys.updater.fetch_data()
with patch.object(type(coresys.security), "verify_own_content"):
await coresys.updater.fetch_data()
assert coresys.updater.version_hassos == AwesomeVersion(expected)
assert coresys.updater.version_hassos == AwesomeVersion(expected)
@pytest.mark.usefixtures("no_job_throttle")
@@ -104,6 +105,7 @@ async def test_delayed_fetch_for_connectivity(
load_binary_fixture("version_stable.json")
)
coresys.websession.head = AsyncMock()
coresys.security.verify_own_content = AsyncMock()
# Network connectivity change causes a series of async tasks to eventually do a version fetch
# Rather then use some kind of sleep loop, set up listener for start of fetch data job

View File

@@ -0,0 +1,128 @@
"""Test CodeNotary."""
from __future__ import annotations
from dataclasses import dataclass
from unittest.mock import AsyncMock, Mock, patch
import pytest
from supervisor.exceptions import (
CodeNotaryBackendError,
CodeNotaryError,
CodeNotaryUntrusted,
)
from supervisor.utils.codenotary import calc_checksum, cas_validate
pytest.skip("code notary has been disabled due to issues", allow_module_level=True)
@dataclass
class SubprocessResponse:
"""Class for specifying subprocess exec response."""
returncode: int = 0
data: str = ""
error: str | None = None
exception: Exception | None = None
@pytest.fixture(name="subprocess_exec")
def fixture_subprocess_exec(request):
"""Mock subprocess exec with specific return."""
response = request.param
if response.exception:
communicate_return = AsyncMock(side_effect=response.exception)
else:
communicate_return = AsyncMock(return_value=(response.data, response.error))
exec_return = Mock(returncode=response.returncode, communicate=communicate_return)
with patch(
"supervisor.utils.codenotary.asyncio.create_subprocess_exec",
return_value=exec_return,
) as subprocess_exec:
yield subprocess_exec
def test_checksum_calc():
"""Calc Checkusm as test."""
assert calc_checksum("test") == calc_checksum(b"test")
assert (
calc_checksum("test")
== "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
)
async def test_valid_checksum():
"""Test a valid autorization."""
await cas_validate(
"notary@home-assistant.io",
"4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0",
)
async def test_invalid_checksum():
"""Test a invalid autorization."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[SubprocessResponse(returncode=1, error=b"x is not notarized")],
)
async def test_not_notarized_error(subprocess_exec):
"""Test received a not notarized error response from command."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[
SubprocessResponse(returncode=1, error=b"test"),
SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'),
SubprocessResponse(returncode=1, error="test".encode("utf-16")),
],
indirect=True,
)
async def test_cas_backend_error(subprocess_exec):
"""Test backend error executing cas command."""
with pytest.raises(CodeNotaryBackendError):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec",
[SubprocessResponse(returncode=0, data='{"status":1}')],
indirect=True,
)
async def test_cas_notarized_untrusted(subprocess_exec):
"""Test cas found notarized but untrusted content."""
with pytest.raises(CodeNotaryUntrusted):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)
@pytest.mark.parametrize(
"subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True
)
async def test_cas_exec_os_error(subprocess_exec):
"""Test os error attempting to execute cas command."""
with pytest.raises(CodeNotaryError):
await cas_validate(
"notary@home-assistant.io",
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
)