Rework fixups & Add integrity (#3614)

* Rework fixups & Add integrity

* Fix tests

* fix test v2

* Protect ddos
This commit is contained in:
Pascal Vizeli 2022-05-05 16:21:19 +02:00 committed by GitHub
parent ae00ea178d
commit 942b5e6150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 220 additions and 35 deletions

View File

@ -12,3 +12,4 @@ pytest-cov==3.0.0
pytest-timeout==2.1.0
pytest==7.1.2
pyupgrade==2.32.1
time-machine==2.6.0

View File

@ -89,4 +89,5 @@ class SuggestionType(str, Enum):
EXECUTE_RELOAD = "execute_reload"
EXECUTE_REMOVE = "execute_remove"
EXECUTE_STOP = "execute_stop"
EXECUTE_INTEGRITY = "execute_integrity"
REGISTRY_LOGIN = "registry_login"

View File

@ -1,4 +1,5 @@
"""Helpers to fixup the system."""
from importlib import import_module
import logging
from ..coresys import CoreSys, CoreSysAttributes
@ -6,11 +7,7 @@ from ..jobs.const import JobCondition
from ..jobs.decorator import Job
from .data import Suggestion
from .fixups.base import FixupBase
from .fixups.clear_full_backup import FixupClearFullBackup
from .fixups.create_full_backup import FixupCreateFullBackup
from .fixups.store_execute_reload import FixupStoreExecuteReload
from .fixups.store_execute_remove import FixupStoreExecuteRemove
from .fixups.store_execute_reset import FixupStoreExecuteReset
from .validate import get_valid_modules
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -21,26 +18,22 @@ class ResolutionFixup(CoreSysAttributes):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize the suggestion class."""
self.coresys = coresys
self._fixups: dict[str, FixupBase] = {}
self._create_full_backup = FixupCreateFullBackup(coresys)
self._clear_full_backup = FixupClearFullBackup(coresys)
self._store_execute_reset = FixupStoreExecuteReset(coresys)
self._store_execute_reload = FixupStoreExecuteReload(coresys)
self._store_execute_remove = FixupStoreExecuteRemove(coresys)
self._load()
def _load(self):
"""Load all checks."""
package = f"{__package__}.fixups"
for module in get_valid_modules("fixups"):
fixup_module = import_module(f"{package}.{module}")
fixup = fixup_module.setup(self.coresys)
self._fixups[fixup.slug] = fixup
@property
def all_fixes(self) -> list[FixupBase]:
"""Return a list of all fixups.
Order can be important!
"""
return [
self._create_full_backup,
self._clear_full_backup,
self._store_execute_reload,
self._store_execute_reset,
self._store_execute_remove,
]
"""Return a list of all fixups."""
return list(self._fixups.values())
@Job(conditions=[JobCondition.HEALTHY, JobCondition.RUNNING])
async def run_autofix(self) -> None:

View File

@ -71,3 +71,8 @@ class FixupBase(ABC, CoreSysAttributes):
def auto(self) -> bool:
"""Return if a fixup can be apply as auto fix."""
return False
@property
def slug(self) -> str:
"""Return the check slug."""
return self.__class__.__module__.rsplit(".", maxsplit=1)[-1]

View File

@ -2,6 +2,7 @@
import logging
from typing import Optional
from ...coresys import CoreSys
from ...exceptions import (
ResolutionFixupError,
ResolutionFixupJobError,
@ -16,6 +17,11 @@ from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupStoreExecuteReload(coresys)
class FixupStoreExecuteReload(FixupBase):
"""Storage class for fixup."""

View File

@ -2,14 +2,19 @@
import logging
from typing import Optional
from supervisor.exceptions import ResolutionFixupError, StoreError, StoreNotFound
from ...coresys import CoreSys
from ...exceptions import ResolutionFixupError, StoreError, StoreNotFound
from ..const import ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupStoreExecuteRemove(coresys)
class FixupStoreExecuteRemove(FixupBase):
"""Storage class for fixup."""

View File

@ -2,6 +2,7 @@
import logging
from typing import Optional
from ...coresys import CoreSys
from ...exceptions import (
ResolutionFixupError,
ResolutionFixupJobError,
@ -17,6 +18,11 @@ from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupStoreExecuteReset(coresys)
class FixupStoreExecuteReset(FixupBase):
"""Storage class for fixup."""

View File

@ -3,13 +3,19 @@ import logging
from typing import Optional
from ...backups.const import BackupType
from ...coresys import CoreSys
from ..const import MINIMUM_FULL_BACKUPS, ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
class FixupClearFullBackup(FixupBase):
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemClearFullBackup(coresys)
class FixupSystemClearFullBackup(FixupBase):
"""Storage class for fixup."""
async def process_fixup(self, reference: Optional[str] = None) -> None:

View File

@ -2,13 +2,19 @@
import logging
from typing import Optional
from ...coresys import CoreSys
from ..const import ContextType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
class FixupCreateFullBackup(FixupBase):
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemCreateFullBackup(coresys)
class FixupSystemCreateFullBackup(FixupBase):
"""Storage class for fixup."""
async def process_fixup(self, reference: Optional[str] = None) -> None:

View File

@ -0,0 +1,66 @@
"""Helpers to check and fix issues with free space."""
from datetime import timedelta
import logging
from typing import Optional
from ...coresys import CoreSys
from ...exceptions import ResolutionFixupError, ResolutionFixupJobError
from ...jobs.const import JobCondition, JobExecutionLimit
from ...jobs.decorator import Job
from ...security.const import ContentTrustResult
from ..const import ContextType, IssueType, SuggestionType
from .base import FixupBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> FixupBase:
"""Check setup function."""
return FixupSystemExecuteIntegrity(coresys)
class FixupSystemExecuteIntegrity(FixupBase):
"""Storage class for fixup."""
@Job(
conditions=[JobCondition.INTERNET_SYSTEM],
on_condition=ResolutionFixupJobError,
limit=JobExecutionLimit.THROTTLE,
throttle_period=timedelta(hours=8),
)
async def process_fixup(self, reference: Optional[str] = None) -> None:
"""Initialize the fixup class."""
result = await self.sys_security.integrity_check()
if ContentTrustResult.FAILED in (result.core, result.supervisor):
raise ResolutionFixupError()
for plugin in result.plugins:
if plugin != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
for addon in result.addons:
if addon != ContentTrustResult.FAILED:
continue
raise ResolutionFixupError()
@property
def suggestion(self) -> SuggestionType:
"""Return a SuggestionType enum."""
return SuggestionType.EXECUTE_INTEGRITY
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SYSTEM
@property
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.TRUST]
@property
def auto(self) -> bool:
"""Return if a fixup can be apply as auto fix."""
return True

View File

@ -17,7 +17,7 @@ from ..exceptions import (
SecurityJobError,
)
from ..jobs.decorator import Job, JobCondition, JobExecutionLimit
from ..resolution.const import ContextType, IssueType
from ..resolution.const import ContextType, IssueType, SuggestionType
from ..utils.codenotary import cas_validate
from ..utils.common import FileConfiguration
from ..utils.pwned import check_pwned_password
@ -78,6 +78,11 @@ class Security(FileConfiguration, CoreSysAttributes):
except CodeNotaryError:
if self.force:
raise
self.sys_resolution.create_issue(
IssueType.TRUST,
ContextType.SYSTEM,
suggestions=[SuggestionType.EXECUTE_INTEGRITY],
)
return
async def verify_own_content(self, checksum: str) -> None:

View File

@ -6,30 +6,43 @@ from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, SuggestionType
from supervisor.resolution.data import Suggestion
from supervisor.resolution.validate import get_valid_modules
async def test_check_autofix(coresys: CoreSys):
"""Test check for setup."""
coresys.core.state = CoreState.RUNNING
coresys.resolution.fixup._create_full_backup.process_fixup = AsyncMock()
coresys.resolution.fixup._fixups[
"system_create_full_backup"
].process_fixup = AsyncMock()
with patch(
"supervisor.resolution.fixups.create_full_backup.FixupCreateFullBackup.auto",
"supervisor.resolution.fixups.system_create_full_backup.FixupSystemCreateFullBackup.auto",
return_value=True,
):
await coresys.resolution.fixup.run_autofix()
coresys.resolution.fixup._create_full_backup.process_fixup.assert_not_called()
coresys.resolution.fixup._fixups[
"system_create_full_backup"
].process_fixup.assert_not_called()
coresys.resolution.suggestions = Suggestion(
SuggestionType.CREATE_FULL_BACKUP, ContextType.SYSTEM
)
with patch(
"supervisor.resolution.fixups.create_full_backup.FixupCreateFullBackup.auto",
"supervisor.resolution.fixups.system_create_full_backup.FixupSystemCreateFullBackup.auto",
return_value=True,
):
await coresys.resolution.fixup.run_autofix()
coresys.resolution.fixup._create_full_backup.process_fixup.assert_called_once()
coresys.resolution.fixup._fixups[
"system_create_full_backup"
].process_fixup.assert_called_once()
assert len(coresys.resolution.suggestions) == 0
def test_dynamic_fixup_loader(coresys: CoreSys):
"""Test dynamic fixup loader, this ensures that all fixups have defined a setup function."""
for fixup in get_valid_modules("fixups"):
assert fixup in coresys.resolution.fixup._fixups

View File

@ -10,13 +10,15 @@ from supervisor.const import ATTR_DATE, ATTR_SLUG, ATTR_TYPE
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, SuggestionType
from supervisor.resolution.data import Suggestion
from supervisor.resolution.fixups.clear_full_backup import FixupClearFullBackup
from supervisor.resolution.fixups.system_clear_full_backup import (
FixupSystemClearFullBackup,
)
from supervisor.utils.dt import utcnow
async def test_fixup(coresys: CoreSys, tmp_path):
"""Test fixup."""
clear_full_backup = FixupClearFullBackup(coresys)
clear_full_backup = FixupSystemClearFullBackup(coresys)
assert not clear_full_backup.auto

View File

@ -5,12 +5,14 @@ from unittest.mock import AsyncMock
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, SuggestionType
from supervisor.resolution.data import Suggestion
from supervisor.resolution.fixups.create_full_backup import FixupCreateFullBackup
from supervisor.resolution.fixups.system_create_full_backup import (
FixupSystemCreateFullBackup,
)
async def test_fixup(coresys: CoreSys):
"""Test fixup."""
create_full_backup = FixupCreateFullBackup(coresys)
create_full_backup = FixupSystemCreateFullBackup(coresys)
assert not create_full_backup.auto

View File

@ -0,0 +1,68 @@
"""Test evaluation base."""
# pylint: disable=import-error,protected-access
from datetime import timedelta
from unittest.mock import AsyncMock
import time_machine
from supervisor.coresys import CoreSys
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
from supervisor.resolution.data import Issue, Suggestion
from supervisor.resolution.fixups.system_execute_integrity import (
FixupSystemExecuteIntegrity,
)
from supervisor.security.const import ContentTrustResult, IntegrityResult
from supervisor.utils.dt import utcnow
async def test_fixup(coresys: CoreSys):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.suggestions = Suggestion(
SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM
)
coresys.resolution.issues = Issue(IssueType.TRUST, ContextType.SYSTEM)
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.PASS,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 0
assert len(coresys.resolution.issues) == 0
async def test_fixup_error(coresys: CoreSys):
"""Test fixup."""
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
assert system_execute_integrity.auto
coresys.resolution.suggestions = Suggestion(
SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM
)
coresys.resolution.issues = Issue(IssueType.TRUST, ContextType.SYSTEM)
coresys.security.integrity_check = AsyncMock(
return_value=IntegrityResult(
ContentTrustResult.FAILED,
ContentTrustResult.PASS,
{"audio": ContentTrustResult.PASS},
)
)
with time_machine.travel(utcnow() + timedelta(hours=24)):
await system_execute_integrity()
assert coresys.security.integrity_check.called
assert len(coresys.resolution.suggestions) == 1
assert len(coresys.resolution.issues) == 1