mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-20 07:36:29 +00:00
System unsupported source modification (#2789)
Co-authored-by: Franck Nijhof <git@frenck.dev>
This commit is contained in:
parent
1fc0ab71aa
commit
62d198111c
@ -190,6 +190,7 @@ def initialize_system_data(coresys: CoreSys) -> None:
|
||||
_LOGGER.warning("Environment variables 'SUPERVISOR_DEV' is set")
|
||||
coresys.updater.channel = UpdateChannel.DEV
|
||||
coresys.config.logging = LogLevel.DEBUG
|
||||
coresys.config.content_trust = False
|
||||
coresys.config.debug = True
|
||||
|
||||
|
||||
|
@ -39,6 +39,7 @@ class UnsupportedReason(str, Enum):
|
||||
SYSTEMD = "systemd"
|
||||
JOB_CONDITIONS = "job_conditions"
|
||||
CONTENT_TRUST = "content_trust"
|
||||
SOURCE_MODS = "source_mods"
|
||||
|
||||
|
||||
class UnhealthyReason(str, Enum):
|
||||
|
52
supervisor/resolution/evaluations/source_mods.py
Normal file
52
supervisor/resolution/evaluations/source_mods.py
Normal file
@ -0,0 +1,52 @@
|
||||
"""Evaluation class for Content Trust."""
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
_SUPERVISOR_SOURCE = Path("/usr/src/supervisor")
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateSourceMods(coresys)
|
||||
|
||||
|
||||
class EvaluateSourceMods(EvaluateBase):
|
||||
"""Evaluate supervisor source modifications."""
|
||||
|
||||
@property
|
||||
def reason(self) -> UnsupportedReason:
|
||||
"""Return a UnsupportedReason enum."""
|
||||
return UnsupportedReason.SOURCE_MODS
|
||||
|
||||
@property
|
||||
def on_failure(self) -> str:
|
||||
"""Return a string that is printed when self.evaluate is False."""
|
||||
return "System detect unauthorized source code modifications."
|
||||
|
||||
@property
|
||||
def states(self) -> List[CoreState]:
|
||||
"""Return a list of valid states when this evaluation can run."""
|
||||
return [CoreState.RUNNING]
|
||||
|
||||
async def evaluate(self) -> None:
|
||||
"""Run evaluation."""
|
||||
if not self.sys_config.content_trust:
|
||||
_LOGGER.warning("Disabled content-trust, skipping evaluation")
|
||||
return
|
||||
|
||||
try:
|
||||
await self.sys_verify_content(path=_SUPERVISOR_SOURCE)
|
||||
except CodeNotaryUntrusted:
|
||||
return True
|
||||
except CodeNotaryError:
|
||||
pass
|
||||
|
||||
return False
|
@ -43,7 +43,7 @@ class ResolutionFixup(CoreSysAttributes):
|
||||
self._store_execute_remove,
|
||||
]
|
||||
|
||||
@Job(conditions=[JobCondition.HEALTHY])
|
||||
@Job(conditions=[JobCondition.HEALTHY, JobCondition.RUNNING])
|
||||
async def run_autofix(self) -> None:
|
||||
"""Run all startup fixes."""
|
||||
_LOGGER.info("Starting system autofix at state %s", self.sys_core.state)
|
||||
|
@ -2,8 +2,6 @@
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from supervisor.const import CoreState
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import ResolutionError, ResolutionNotFound
|
||||
from ..utils.common import FileConfiguration
|
||||
@ -167,10 +165,10 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
|
||||
async def healthcheck(self):
|
||||
"""Scheduled task to check for known issues."""
|
||||
await self.check.check_system()
|
||||
await self.evaluate.evaluate_system()
|
||||
|
||||
# Run autofix if possible
|
||||
if self.sys_core.state == CoreState.RUNNING:
|
||||
await self.fixup.run_autofix()
|
||||
await self.fixup.run_autofix()
|
||||
|
||||
# Create notification for any known issues
|
||||
await self.notify.issue_notifications()
|
||||
|
@ -50,7 +50,7 @@ async def vcn_validate(
|
||||
command.extend(["--hash", checksum])
|
||||
elif path:
|
||||
if path.is_dir:
|
||||
command.append(f"dir:/{path!s}")
|
||||
command.append(f"dir://{path.as_posix()}")
|
||||
else:
|
||||
command.append(path.as_posix())
|
||||
else:
|
||||
|
61
tests/resolution/evaluation/test_evaluate_source_mods.py
Normal file
61
tests/resolution/evaluation/test_evaluate_source_mods.py
Normal file
@ -0,0 +1,61 @@
|
||||
"""Test evaluation base."""
|
||||
# pylint: disable=import-error,protected-access
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from supervisor.const import CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
|
||||
from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods
|
||||
|
||||
|
||||
async def test_evaluation(coresys: CoreSys):
|
||||
"""Test evaluation."""
|
||||
sourcemods = EvaluateSourceMods(coresys)
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
|
||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
||||
with patch(
|
||||
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.sys_verify_content",
|
||||
AsyncMock(side_effect=CodeNotaryUntrusted),
|
||||
):
|
||||
await sourcemods()
|
||||
assert sourcemods.reason in coresys.resolution.unsupported
|
||||
|
||||
with patch(
|
||||
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.sys_verify_content",
|
||||
AsyncMock(side_effect=CodeNotaryError),
|
||||
):
|
||||
await sourcemods()
|
||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
||||
|
||||
with patch(
|
||||
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.sys_verify_content",
|
||||
AsyncMock(),
|
||||
):
|
||||
await sourcemods()
|
||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
||||
|
||||
|
||||
async def test_did_run(coresys: CoreSys):
|
||||
"""Test that the evaluation ran as expected."""
|
||||
sourcemods = EvaluateSourceMods(coresys)
|
||||
should_run = sourcemods.states
|
||||
should_not_run = [state for state in CoreState if state not in should_run]
|
||||
assert len(should_run) != 0
|
||||
assert len(should_not_run) != 0
|
||||
|
||||
with patch(
|
||||
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.evaluate",
|
||||
return_value=None,
|
||||
) as evaluate:
|
||||
for state in should_run:
|
||||
coresys.core.state = state
|
||||
await sourcemods()
|
||||
evaluate.assert_called_once()
|
||||
evaluate.reset_mock()
|
||||
|
||||
for state in should_not_run:
|
||||
coresys.core.state = state
|
||||
await sourcemods()
|
||||
evaluate.assert_not_called()
|
||||
evaluate.reset_mock()
|
Loading…
x
Reference in New Issue
Block a user