mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-13 20:26:29 +00:00
Check management (#2703)
* Check management * Add test * Don't allow disable core_security * options and decorator * streamline config handling * streamline v2 * fix logging * Add tests * Fix test * cleanup v1 * fix api * Add more test * Expose option also for cli * address comments from Paulus * Address second comment * Update supervisor/resolution/checks/base.py Co-authored-by: Paulus Schoutsen <balloob@gmail.com> * fix lint * Fix black Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch> Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
This commit is contained in:
parent
a52713611c
commit
73849b7468
@ -228,6 +228,9 @@ class RestAPI(CoreSysAttributes):
|
||||
self.webapp.add_routes(
|
||||
[
|
||||
web.get("/resolution/info", api_resolution.info),
|
||||
web.post(
|
||||
"/resolution/check/{check}/options", api_resolution.options_check
|
||||
),
|
||||
web.post(
|
||||
"/resolution/suggestion/{suggestion}",
|
||||
api_resolution.apply_suggestion,
|
||||
|
@ -13,7 +13,7 @@ from ..const import (
|
||||
from ..coresys import CoreSysAttributes
|
||||
from ..discovery.validate import valid_discovery_service
|
||||
from ..exceptions import APIError, APIForbidden
|
||||
from .utils import api_process, api_validate
|
||||
from .utils import api_process, api_validate, require_home_assistant
|
||||
|
||||
SCHEMA_DISCOVERY = vol.Schema(
|
||||
{
|
||||
@ -33,15 +33,10 @@ class APIDiscovery(CoreSysAttributes):
|
||||
raise APIError("Discovery message not found")
|
||||
return message
|
||||
|
||||
def _check_permission_ha(self, request):
|
||||
"""Check permission for API call / Home Assistant."""
|
||||
if request[REQUEST_FROM] != self.sys_homeassistant:
|
||||
raise APIForbidden("Only HomeAssistant can use this API!")
|
||||
|
||||
@api_process
|
||||
@require_home_assistant
|
||||
async def list(self, request):
|
||||
"""Show register services."""
|
||||
self._check_permission_ha(request)
|
||||
|
||||
# Get available discovery
|
||||
discovery = []
|
||||
@ -79,13 +74,11 @@ class APIDiscovery(CoreSysAttributes):
|
||||
return {ATTR_UUID: message.uuid}
|
||||
|
||||
@api_process
|
||||
@require_home_assistant
|
||||
async def get_discovery(self, request):
|
||||
"""Read data into a discovery message."""
|
||||
message = self._extract_message(request)
|
||||
|
||||
# HomeAssistant?
|
||||
self._check_permission_ha(request)
|
||||
|
||||
return {
|
||||
ATTR_ADDON: message.addon,
|
||||
ATTR_SERVICE: message.service,
|
||||
|
@ -25,10 +25,9 @@ from ..const import (
|
||||
COOKIE_INGRESS,
|
||||
HEADER_TOKEN,
|
||||
HEADER_TOKEN_OLD,
|
||||
REQUEST_FROM,
|
||||
)
|
||||
from ..coresys import CoreSysAttributes
|
||||
from .utils import api_process, api_validate
|
||||
from .utils import api_process, api_validate, require_home_assistant
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
@ -50,11 +49,6 @@ class APIIngress(CoreSysAttributes):
|
||||
|
||||
return addon
|
||||
|
||||
def _check_ha_access(self, request: web.Request) -> None:
|
||||
if request[REQUEST_FROM] != self.sys_homeassistant:
|
||||
_LOGGER.warning("Ingress is only available behind Home Assistant")
|
||||
raise HTTPUnauthorized()
|
||||
|
||||
def _create_url(self, addon: Addon, path: str) -> str:
|
||||
"""Create URL to container."""
|
||||
return f"http://{addon.ip_address}:{addon.ingress_port}/{path}"
|
||||
@ -74,18 +68,16 @@ class APIIngress(CoreSysAttributes):
|
||||
return {ATTR_PANELS: addons}
|
||||
|
||||
@api_process
|
||||
@require_home_assistant
|
||||
async def create_session(self, request: web.Request) -> Dict[str, Any]:
|
||||
"""Create a new session."""
|
||||
self._check_ha_access(request)
|
||||
|
||||
session = self.sys_ingress.create_session()
|
||||
return {ATTR_SESSION: session}
|
||||
|
||||
@api_process
|
||||
@require_home_assistant
|
||||
async def validate_session(self, request: web.Request) -> Dict[str, Any]:
|
||||
"""Validate session and extending how long it's valid for."""
|
||||
self._check_ha_access(request)
|
||||
|
||||
data = await api_validate(VALIDATE_SESSION_DATA, request)
|
||||
|
||||
# Check Ingress Session
|
||||
@ -93,11 +85,11 @@ class APIIngress(CoreSysAttributes):
|
||||
_LOGGER.warning("No valid ingress session %s", data[ATTR_SESSION])
|
||||
raise HTTPUnauthorized()
|
||||
|
||||
@require_home_assistant
|
||||
async def handler(
|
||||
self, request: web.Request
|
||||
) -> Union[web.Response, web.StreamResponse, web.WebSocketResponse]:
|
||||
"""Route data to Supervisor ingress service."""
|
||||
self._check_ha_access(request)
|
||||
|
||||
# Check Ingress Session
|
||||
session = request.cookies.get(COOKIE_INGRESS)
|
||||
|
@ -4,11 +4,22 @@ from typing import Any, Awaitable, Dict
|
||||
|
||||
from aiohttp import web
|
||||
import attr
|
||||
import voluptuous as vol
|
||||
|
||||
from ..const import ATTR_ISSUES, ATTR_SUGGESTIONS, ATTR_UNHEALTHY, ATTR_UNSUPPORTED
|
||||
from ..const import (
|
||||
ATTR_CHECKS,
|
||||
ATTR_ENABLED,
|
||||
ATTR_ISSUES,
|
||||
ATTR_SLUG,
|
||||
ATTR_SUGGESTIONS,
|
||||
ATTR_UNHEALTHY,
|
||||
ATTR_UNSUPPORTED,
|
||||
)
|
||||
from ..coresys import CoreSysAttributes
|
||||
from ..exceptions import APIError, ResolutionNotFound
|
||||
from .utils import api_process
|
||||
from .utils import api_process, api_validate
|
||||
|
||||
SCHEMA_CHECK_OPTIONS = vol.Schema({vol.Optional(ATTR_ENABLED): bool})
|
||||
|
||||
|
||||
class APIResoulution(CoreSysAttributes):
|
||||
@ -25,6 +36,10 @@ class APIResoulution(CoreSysAttributes):
|
||||
for suggestion in self.sys_resolution.suggestions
|
||||
],
|
||||
ATTR_ISSUES: [attr.asdict(issue) for issue in self.sys_resolution.issues],
|
||||
ATTR_CHECKS: [
|
||||
{ATTR_ENABLED: check.enabled, ATTR_SLUG: check.slug}
|
||||
for check in self.sys_resolution.check.all_checks
|
||||
],
|
||||
}
|
||||
|
||||
@api_process
|
||||
@ -62,3 +77,19 @@ class APIResoulution(CoreSysAttributes):
|
||||
def healthcheck(self, request: web.Request) -> Awaitable[None]:
|
||||
"""Run backend healthcheck."""
|
||||
return asyncio.shield(self.sys_resolution.healthcheck())
|
||||
|
||||
@api_process
|
||||
async def options_check(self, request: web.Request) -> None:
|
||||
"""Set options for check."""
|
||||
body = await api_validate(SCHEMA_CHECK_OPTIONS, request)
|
||||
|
||||
try:
|
||||
check = self.sys_resolution.check.get(request.match_info.get("check"))
|
||||
except ResolutionNotFound:
|
||||
raise APIError("The supplied check slug is not available") from None
|
||||
|
||||
# Apply options
|
||||
if ATTR_ENABLED in body:
|
||||
check.enabled = body[ATTR_ENABLED]
|
||||
|
||||
self.sys_resolution.save_data()
|
||||
|
@ -4,6 +4,8 @@ from typing import Any, Dict, List, Optional
|
||||
|
||||
from aiohttp import web
|
||||
from aiohttp.hdrs import AUTHORIZATION
|
||||
from aiohttp.web_exceptions import HTTPUnauthorized
|
||||
from aiohttp.web_request import Request
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@ -14,9 +16,11 @@ from ..const import (
|
||||
JSON_DATA,
|
||||
JSON_MESSAGE,
|
||||
JSON_RESULT,
|
||||
REQUEST_FROM,
|
||||
RESULT_ERROR,
|
||||
RESULT_OK,
|
||||
)
|
||||
from ..coresys import CoreSys
|
||||
from ..exceptions import APIError, APIForbidden, DockerAPIError, HassioError
|
||||
from ..utils import check_exception_chain, get_message_from_exception_chain
|
||||
from ..utils.json import JSONEncoder
|
||||
@ -73,6 +77,20 @@ def api_process(method):
|
||||
return wrap_api
|
||||
|
||||
|
||||
def require_home_assistant(method):
|
||||
"""Ensure that the request comes from Home Assistant."""
|
||||
|
||||
async def wrap_api(api, *args, **kwargs):
|
||||
"""Return API information."""
|
||||
coresys: CoreSys = api.coresys
|
||||
request: Request = args[0]
|
||||
if request[REQUEST_FROM] != coresys.homeassistant:
|
||||
raise HTTPUnauthorized()
|
||||
return await method(api, *args, **kwargs)
|
||||
|
||||
return wrap_api
|
||||
|
||||
|
||||
def api_process_raw(content):
|
||||
"""Wrap content_type into function."""
|
||||
|
||||
|
@ -118,6 +118,7 @@ ATTR_CARD = "card"
|
||||
ATTR_CHANGELOG = "changelog"
|
||||
ATTR_CHANNEL = "channel"
|
||||
ATTR_CHASSIS = "chassis"
|
||||
ATTR_CHECKS = "checks"
|
||||
ATTR_CLI = "cli"
|
||||
ATTR_CONFIG = "config"
|
||||
ATTR_CONFIGURATION = "configuration"
|
||||
|
@ -355,6 +355,10 @@ class ResolutionError(HassioError):
|
||||
"""Raise if an error is happning on resoltuion."""
|
||||
|
||||
|
||||
class ResolutionCheckError(ResolutionError):
|
||||
"""Raise when there are an issue managing checks."""
|
||||
|
||||
|
||||
class ResolutionNotFound(ResolutionError):
|
||||
"""Raise if suggestion/issue was not found."""
|
||||
|
||||
|
@ -1,8 +1,10 @@
|
||||
"""Helpers to checks the system."""
|
||||
import logging
|
||||
from typing import List
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from ..const import ATTR_CHECKS
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import ResolutionNotFound
|
||||
from .checks.addon_pwned import CheckAddonPwned
|
||||
from .checks.base import CheckBase
|
||||
from .checks.core_security import CheckCoreSecurity
|
||||
@ -17,25 +19,40 @@ class ResolutionCheck(CoreSysAttributes):
|
||||
def __init__(self, coresys: CoreSys) -> None:
|
||||
"""Initialize the checks class."""
|
||||
self.coresys = coresys
|
||||
|
||||
self._core_security = CheckCoreSecurity(coresys)
|
||||
self._free_space = CheckFreeSpace(coresys)
|
||||
self._addon_pwned = CheckAddonPwned(coresys)
|
||||
|
||||
@property
|
||||
def all_tests(self) -> List[CheckBase]:
|
||||
def data(self) -> Dict[str, Any]:
|
||||
"""Return data."""
|
||||
return self.sys_resolution.data[ATTR_CHECKS]
|
||||
|
||||
@property
|
||||
def all_checks(self) -> List[CheckBase]:
|
||||
"""Return all list of all checks."""
|
||||
return [self._core_security, self._free_space, self._addon_pwned]
|
||||
|
||||
def get(self, slug: str) -> CheckBase:
|
||||
"""Return check based on slug."""
|
||||
for check in self.all_checks:
|
||||
if slug != check.slug:
|
||||
continue
|
||||
return check
|
||||
raise ResolutionNotFound(f"Check with slug {slug} not found!")
|
||||
|
||||
async def check_system(self) -> None:
|
||||
"""Check the system."""
|
||||
_LOGGER.info("Starting system checks with state %s", self.sys_core.state)
|
||||
|
||||
for test in self.all_tests:
|
||||
for check in self.all_checks:
|
||||
if not check.enabled:
|
||||
_LOGGER.warning("Skipping disabled check %s", check.slug)
|
||||
continue
|
||||
try:
|
||||
await test()
|
||||
await check()
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
_LOGGER.warning("Error during processing %s: %s", test.issue, err)
|
||||
_LOGGER.error("Error during processing %s: %s", check.issue, err)
|
||||
self.sys_capture_exception(err)
|
||||
|
||||
_LOGGER.info("System checks complete")
|
||||
|
@ -3,7 +3,7 @@ from abc import ABC, abstractmethod, abstractproperty
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from ...const import CoreState
|
||||
from ...const import ATTR_ENABLED, CoreState
|
||||
from ...coresys import CoreSys, CoreSysAttributes
|
||||
from ..const import ContextType, IssueType
|
||||
|
||||
@ -41,11 +41,16 @@ class CheckBase(ABC, CoreSysAttributes):
|
||||
self.sys_resolution.dismiss_issue(issue)
|
||||
|
||||
# System is not affected
|
||||
if affected and not self.multiple:
|
||||
if affected and self.context not in (ContextType.ADDON, ContextType.PLUGIN):
|
||||
return
|
||||
_LOGGER.info("Run check for %s/%s", self.issue, self.context)
|
||||
await self.run_check()
|
||||
|
||||
@property
|
||||
def slug(self) -> str:
|
||||
"""Return the check slug."""
|
||||
return self.__class__.__module__.split(".")[-1]
|
||||
|
||||
@abstractmethod
|
||||
async def run_check(self) -> None:
|
||||
"""Run check if not affected by issue."""
|
||||
@ -64,12 +69,17 @@ class CheckBase(ABC, CoreSysAttributes):
|
||||
def context(self) -> ContextType:
|
||||
"""Return a ContextType enum."""
|
||||
|
||||
@property
|
||||
def multiple(self) -> bool:
|
||||
"""Return True if they can have multiple issues referenced by reference."""
|
||||
return self.context in (ContextType.ADDON, ContextType.PLUGIN)
|
||||
|
||||
@property
|
||||
def states(self) -> List[CoreState]:
|
||||
"""Return a list of valid states when this check can run."""
|
||||
return []
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Return True if the check is enabled."""
|
||||
return self.sys_resolution.check.data[self.slug][ATTR_ENABLED]
|
||||
|
||||
@enabled.setter
|
||||
def enabled(self, value: bool) -> None:
|
||||
"""Enable or disbable check."""
|
||||
self.sys_resolution.check.data[self.slug][ATTR_ENABLED] = value
|
||||
|
@ -1,5 +1,4 @@
|
||||
"""Helpers to check and fix issues with free space."""
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
|
||||
from ...const import SNAPSHOT_FULL, CoreState
|
||||
@ -13,8 +12,6 @@ from ..const import (
|
||||
from ..data import Suggestion
|
||||
from .base import CheckBase
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CheckFreeSpace(CheckBase):
|
||||
"""Storage class for check."""
|
||||
|
@ -1,5 +1,10 @@
|
||||
"""Constants for the resoulution manager."""
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
from ..const import SUPERVISOR_DATA
|
||||
|
||||
FILE_CONFIG_RESOLUTION = Path(SUPERVISOR_DATA, "resolution.json")
|
||||
|
||||
SCHEDULED_HEALTHCHECK = 3600
|
||||
|
||||
|
@ -1,13 +1,15 @@
|
||||
"""Supervisor resolution center."""
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from supervisor.const import CoreState
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import ResolutionError, ResolutionNotFound
|
||||
from ..utils.common import FileConfiguration
|
||||
from .check import ResolutionCheck
|
||||
from .const import (
|
||||
FILE_CONFIG_RESOLUTION,
|
||||
SCHEDULED_HEALTHCHECK,
|
||||
ContextType,
|
||||
IssueType,
|
||||
@ -19,15 +21,18 @@ from .data import Issue, Suggestion
|
||||
from .evaluate import ResolutionEvaluation
|
||||
from .fixup import ResolutionFixup
|
||||
from .notify import ResolutionNotify
|
||||
from .validate import SCHEMA_RESOLUTION_CONFIG
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResolutionManager(CoreSysAttributes):
|
||||
class ResolutionManager(FileConfiguration, CoreSysAttributes):
|
||||
"""Resolution manager for supervisor."""
|
||||
|
||||
def __init__(self, coresys: CoreSys):
|
||||
"""Initialize Resolution manager."""
|
||||
super().__init__(FILE_CONFIG_RESOLUTION, SCHEMA_RESOLUTION_CONFIG)
|
||||
|
||||
self.coresys: CoreSys = coresys
|
||||
self._evaluate = ResolutionEvaluation(coresys)
|
||||
self._check = ResolutionCheck(coresys)
|
||||
@ -39,6 +44,11 @@ class ResolutionManager(CoreSysAttributes):
|
||||
self._unsupported: List[UnsupportedReason] = []
|
||||
self._unhealthy: List[UnhealthyReason] = []
|
||||
|
||||
@property
|
||||
def data(self) -> Dict[str, Any]:
|
||||
"""Return data."""
|
||||
return self._data
|
||||
|
||||
@property
|
||||
def evaluate(self) -> ResolutionEvaluation:
|
||||
"""Return the ResolutionEvaluation class."""
|
||||
|
45
supervisor/resolution/validate.py
Normal file
45
supervisor/resolution/validate.py
Normal file
@ -0,0 +1,45 @@
|
||||
"""Validate resolution configuration schema."""
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from ..const import ATTR_CHECKS, ATTR_ENABLED
|
||||
|
||||
|
||||
def _get_valid_modules(folder) -> List[str]:
|
||||
"""Validate check name."""
|
||||
module_files = Path(__file__).parent.joinpath(folder)
|
||||
if not module_files.exists():
|
||||
raise vol.Invalid(f"Module folder '{folder}' not found!")
|
||||
|
||||
return [
|
||||
module.stem
|
||||
for module in module_files.glob("*.py")
|
||||
if module.name not in ("base.py", "__init__.py")
|
||||
]
|
||||
|
||||
|
||||
SCHEMA_CHECK_CONFIG = vol.Schema(
|
||||
{
|
||||
vol.Required(ATTR_ENABLED, default=True): bool,
|
||||
},
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
||||
|
||||
SCHEMA_CHECKS_CONFIG = vol.Schema(
|
||||
{
|
||||
vol.Required(check, default=SCHEMA_CHECK_CONFIG({})): SCHEMA_CHECK_CONFIG
|
||||
for check in _get_valid_modules("checks")
|
||||
},
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
||||
|
||||
SCHEMA_RESOLUTION_CONFIG = vol.Schema(
|
||||
{
|
||||
vol.Required(
|
||||
ATTR_CHECKS, default=SCHEMA_CHECKS_CONFIG({})
|
||||
): SCHEMA_CHECKS_CONFIG,
|
||||
},
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
@ -6,40 +6,35 @@ import pytest
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stub_auth():
|
||||
"""Bypass auth check."""
|
||||
with patch("supervisor.api.ingress.APIIngress._check_ha_access") as mock_auth:
|
||||
yield mock_auth
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_validate_session(stub_auth, api_client, coresys):
|
||||
async def test_validate_session(api_client, coresys):
|
||||
"""Test validating ingress session."""
|
||||
coresys.core.sys_homeassistant.supervisor_token = "ABCD"
|
||||
resp = await api_client.post(
|
||||
"/ingress/validate_session",
|
||||
json={"session": "non-existing"},
|
||||
)
|
||||
assert resp.status == 401
|
||||
assert len(stub_auth.mock_calls) == 1
|
||||
with patch("aiohttp.web_request.BaseRequest.__getitem__", return_value=None):
|
||||
resp = await api_client.post(
|
||||
"/ingress/validate_session",
|
||||
json={"session": "non-existing"},
|
||||
)
|
||||
assert resp.status == 401
|
||||
|
||||
resp = await api_client.post("/ingress/session")
|
||||
result = await resp.json()
|
||||
assert len(stub_auth.mock_calls) == 2
|
||||
with patch(
|
||||
"aiohttp.web_request.BaseRequest.__getitem__",
|
||||
return_value=coresys.homeassistant,
|
||||
):
|
||||
|
||||
assert "session" in result["data"]
|
||||
session = result["data"]["session"]
|
||||
assert session in coresys.ingress.sessions
|
||||
resp = await api_client.post("/ingress/session")
|
||||
result = await resp.json()
|
||||
|
||||
valid_time = coresys.ingress.sessions[session]
|
||||
assert "session" in result["data"]
|
||||
session = result["data"]["session"]
|
||||
assert session in coresys.ingress.sessions
|
||||
|
||||
resp = await api_client.post(
|
||||
"/ingress/validate_session",
|
||||
json={"session": session},
|
||||
)
|
||||
assert resp.status == 200
|
||||
assert len(stub_auth.mock_calls) == 3
|
||||
assert await resp.json() == {"result": "ok", "data": {}}
|
||||
valid_time = coresys.ingress.sessions[session]
|
||||
|
||||
assert coresys.ingress.sessions[session] > valid_time
|
||||
resp = await api_client.post(
|
||||
"/ingress/validate_session",
|
||||
json={"session": session},
|
||||
)
|
||||
assert resp.status == 200
|
||||
assert await resp.json() == {"result": "ok", "data": {}}
|
||||
|
||||
assert coresys.ingress.sessions[session] > valid_time
|
||||
|
@ -100,3 +100,20 @@ async def test_api_resolution_unhealthy(coresys: CoreSys, api_client):
|
||||
resp = await api_client.get("/resolution/info")
|
||||
result = await resp.json()
|
||||
assert UnhealthyReason.DOCKER == result["data"][ATTR_UNHEALTHY][-1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_resolution_check_options(coresys: CoreSys, api_client):
|
||||
"""Test client API with checks options."""
|
||||
free_space = coresys.resolution.check.get("free_space")
|
||||
|
||||
assert free_space.enabled
|
||||
await api_client.post(
|
||||
f"/resolution/check/{free_space.slug}/options", json={"enabled": False}
|
||||
)
|
||||
assert not free_space.enabled
|
||||
|
||||
await api_client.post(
|
||||
f"/resolution/check/{free_space.slug}/options", json={"enabled": True}
|
||||
)
|
||||
assert free_space.enabled
|
||||
|
@ -11,6 +11,7 @@ import pytest
|
||||
|
||||
from supervisor.api import RestAPI
|
||||
from supervisor.bootstrap import initialize_coresys
|
||||
from supervisor.const import REQUEST_FROM
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.dbus.network import NetworkManager
|
||||
from supervisor.docker import DockerAPI
|
||||
@ -141,6 +142,7 @@ async def coresys(loop, docker, network_manager, aiohttp_client) -> CoreSys:
|
||||
coresys_obj._updater.save_data = MagicMock()
|
||||
coresys_obj._config.save_data = MagicMock()
|
||||
coresys_obj._jobs.save_data = MagicMock()
|
||||
coresys_obj._resolution.save_data = MagicMock()
|
||||
|
||||
# Mock test client
|
||||
coresys_obj.arch._default_arch = "amd64"
|
||||
@ -186,8 +188,15 @@ def sys_supervisor():
|
||||
@pytest.fixture
|
||||
async def api_client(aiohttp_client, coresys: CoreSys):
|
||||
"""Fixture for RestAPI client."""
|
||||
|
||||
@web.middleware
|
||||
async def _security_middleware(request: web.Request, handler: web.RequestHandler):
|
||||
"""Make request are from Core."""
|
||||
request[REQUEST_FROM] = coresys.homeassistant
|
||||
return await handler(request)
|
||||
|
||||
api = RestAPI(coresys)
|
||||
api.webapp = web.Application()
|
||||
api.webapp = web.Application(middlewares=[_security_middleware])
|
||||
api.start = AsyncMock()
|
||||
await api.load()
|
||||
yield await aiohttp_client(api.webapp)
|
||||
|
@ -1,9 +1,12 @@
|
||||
"""Test check."""
|
||||
# pylint: disable=import-error
|
||||
# pylint: disable=import-error,protected-access
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from supervisor.const import CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import ResolutionNotFound
|
||||
from supervisor.resolution.const import IssueType
|
||||
|
||||
|
||||
@ -52,3 +55,34 @@ async def test_if_check_cleanup_issue(coresys: CoreSys):
|
||||
await coresys.resolution.check.check_system()
|
||||
|
||||
assert len(coresys.resolution.issues) == 0
|
||||
|
||||
|
||||
async def test_enable_disable_checks(coresys: CoreSys):
|
||||
"""Test enable and disable check."""
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
free_space = coresys.resolution.check.get("free_space")
|
||||
|
||||
# Ensure the check was enabled
|
||||
assert free_space.enabled
|
||||
|
||||
free_space.enabled = False
|
||||
assert not free_space.enabled
|
||||
|
||||
with patch(
|
||||
"supervisor.resolution.checks.free_space.CheckFreeSpace.run_check",
|
||||
return_value=False,
|
||||
) as free_space:
|
||||
await coresys.resolution.check.check_system()
|
||||
free_space.assert_not_called()
|
||||
|
||||
free_space.enabled = True
|
||||
assert free_space.enabled
|
||||
|
||||
|
||||
async def test_get_checks(coresys: CoreSys):
|
||||
"""Test get check with slug."""
|
||||
|
||||
with pytest.raises(ResolutionNotFound):
|
||||
coresys.resolution.check.get("does_not_exsist")
|
||||
|
||||
assert coresys.resolution.check.get("free_space")
|
||||
|
@ -17,6 +17,13 @@ class TestAddon:
|
||||
is_installed = True
|
||||
|
||||
|
||||
async def test_base(coresys: CoreSys):
|
||||
"""Test check basics."""
|
||||
addon_pwned = CheckAddonPwned(coresys)
|
||||
assert addon_pwned.slug == "addon_pwned"
|
||||
assert addon_pwned.enabled
|
||||
|
||||
|
||||
async def test_check(coresys: CoreSys):
|
||||
"""Test check."""
|
||||
addon_pwned = CheckAddonPwned(coresys)
|
||||
|
@ -11,6 +11,13 @@ from supervisor.resolution.checks.core_security import CheckCoreSecurity
|
||||
from supervisor.resolution.const import IssueType
|
||||
|
||||
|
||||
async def test_base(coresys: CoreSys):
|
||||
"""Test check basics."""
|
||||
core_security = CheckCoreSecurity(coresys)
|
||||
assert core_security.slug == "core_security"
|
||||
assert core_security.enabled
|
||||
|
||||
|
||||
async def test_check(coresys: CoreSys, tmp_path):
|
||||
"""Test check."""
|
||||
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
|
||||
|
@ -8,6 +8,13 @@ from supervisor.resolution.checks.free_space import CheckFreeSpace
|
||||
from supervisor.resolution.const import IssueType
|
||||
|
||||
|
||||
async def test_base(coresys: CoreSys):
|
||||
"""Test check basics."""
|
||||
free_space = CheckFreeSpace(coresys)
|
||||
assert free_space.slug == "free_space"
|
||||
assert free_space.enabled
|
||||
|
||||
|
||||
async def test_check(coresys: CoreSys):
|
||||
"""Test check."""
|
||||
free_space = CheckFreeSpace(coresys)
|
||||
|
Loading…
x
Reference in New Issue
Block a user