mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-12 11:46:31 +00:00
Dynamically load resolution checks (#2716)
* Dynamically load resouces checks * address comment * Apply suggestions from code review Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch> * Update supervisor/resolution/check.py * fix check Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch>
This commit is contained in:
parent
55382d000b
commit
059233c111
@ -1,14 +1,13 @@
|
||||
"""Helpers to checks the system."""
|
||||
from importlib import import_module
|
||||
import logging
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from ..const import ATTR_CHECKS
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import ResolutionNotFound
|
||||
from .checks.addon_pwned import CheckAddonPwned
|
||||
from .checks.base import CheckBase
|
||||
from .checks.core_security import CheckCoreSecurity
|
||||
from .checks.free_space import CheckFreeSpace
|
||||
from .validate import get_valid_modules
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
@ -19,9 +18,8 @@ class ResolutionCheck(CoreSysAttributes):
|
||||
def __init__(self, coresys: CoreSys) -> None:
|
||||
"""Initialize the checks class."""
|
||||
self.coresys = coresys
|
||||
self._core_security = CheckCoreSecurity(coresys)
|
||||
self._free_space = CheckFreeSpace(coresys)
|
||||
self._addon_pwned = CheckAddonPwned(coresys)
|
||||
self._checks: Dict[str, CheckBase] = {}
|
||||
self._load()
|
||||
|
||||
@property
|
||||
def data(self) -> Dict[str, Any]:
|
||||
@ -31,14 +29,21 @@ class ResolutionCheck(CoreSysAttributes):
|
||||
@property
|
||||
def all_checks(self) -> List[CheckBase]:
|
||||
"""Return all list of all checks."""
|
||||
return [self._core_security, self._free_space, self._addon_pwned]
|
||||
return list(self._checks.values())
|
||||
|
||||
def _load(self):
|
||||
"""Load all checks."""
|
||||
package = f"{__package__}.checks"
|
||||
for module in get_valid_modules("checks"):
|
||||
check_module = import_module(f"{package}.{module}")
|
||||
check = check_module.setup(self.coresys)
|
||||
self._checks[check.slug] = check
|
||||
|
||||
def get(self, slug: str) -> CheckBase:
|
||||
"""Return check based on slug."""
|
||||
for check in self.all_checks:
|
||||
if slug != check.slug:
|
||||
continue
|
||||
return check
|
||||
if slug in self._checks:
|
||||
return self._checks[slug]
|
||||
|
||||
raise ResolutionNotFound(f"Check with slug {slug} not found!")
|
||||
|
||||
async def check_system(self) -> None:
|
||||
|
@ -4,6 +4,7 @@ from datetime import timedelta
|
||||
from typing import List, Optional
|
||||
|
||||
from ...const import AddonState, CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ...exceptions import PwnedConnectivityError, PwnedError
|
||||
from ...jobs.const import JobCondition, JobExecutionLimit
|
||||
from ...jobs.decorator import Job
|
||||
@ -12,6 +13,11 @@ from ..const import ContextType, IssueType, SuggestionType
|
||||
from .base import CheckBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> "CheckAddonPwned":
|
||||
"""Check setup function."""
|
||||
return CheckAddonPwned(coresys)
|
||||
|
||||
|
||||
class CheckAddonPwned(CheckBase):
|
||||
"""CheckAddonPwned class for check."""
|
||||
|
||||
|
@ -6,10 +6,16 @@ from typing import List, Optional
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionException
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import ContextType, IssueType, SuggestionType
|
||||
from .base import CheckBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> "CheckCoreSecurity":
|
||||
"""Check setup function."""
|
||||
return CheckCoreSecurity(coresys)
|
||||
|
||||
|
||||
class SecurityReference(str, Enum):
|
||||
"""Version references."""
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from ...const import SNAPSHOT_FULL, CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import (
|
||||
MINIMUM_FREE_SPACE_THRESHOLD,
|
||||
MINIMUM_FULL_SNAPSHOTS,
|
||||
@ -13,6 +14,11 @@ from ..data import Suggestion
|
||||
from .base import CheckBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> "CheckFreeSpace":
|
||||
"""Check setup function."""
|
||||
return CheckFreeSpace(coresys)
|
||||
|
||||
|
||||
class CheckFreeSpace(CheckBase):
|
||||
"""Storage class for check."""
|
||||
|
||||
|
@ -7,7 +7,7 @@ import voluptuous as vol
|
||||
from ..const import ATTR_CHECKS, ATTR_ENABLED
|
||||
|
||||
|
||||
def _get_valid_modules(folder) -> List[str]:
|
||||
def get_valid_modules(folder) -> List[str]:
|
||||
"""Validate check name."""
|
||||
module_files = Path(__file__).parent.joinpath(folder)
|
||||
if not module_files.exists():
|
||||
@ -30,7 +30,7 @@ SCHEMA_CHECK_CONFIG = vol.Schema(
|
||||
SCHEMA_CHECKS_CONFIG = vol.Schema(
|
||||
{
|
||||
vol.Required(check, default=SCHEMA_CHECK_CONFIG({})): SCHEMA_CHECK_CONFIG
|
||||
for check in _get_valid_modules("checks")
|
||||
for check in get_valid_modules("checks")
|
||||
},
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
||||
|
@ -8,6 +8,7 @@ from supervisor.const import CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import ResolutionNotFound
|
||||
from supervisor.resolution.const import IssueType
|
||||
from supervisor.resolution.validate import get_valid_modules
|
||||
|
||||
|
||||
async def test_check_setup(coresys: CoreSys):
|
||||
@ -86,3 +87,10 @@ async def test_get_checks(coresys: CoreSys):
|
||||
coresys.resolution.check.get("does_not_exsist")
|
||||
|
||||
assert coresys.resolution.check.get("free_space")
|
||||
|
||||
|
||||
def test_dynamic_check_loader(coresys: CoreSys):
|
||||
"""Test dynamic check loader, this ensures that all checks have defined a setup function."""
|
||||
coresys.resolution.check._load()
|
||||
for check in get_valid_modules("checks"):
|
||||
assert check in coresys.resolution.check._checks
|
||||
|
Loading…
x
Reference in New Issue
Block a user