Add JobManager and Job decorator (#2225)

* Adds condition decorator to block execution that require internet

* Fix exsisting tests

* Add internet state to network info

* Add healthy condition

* Add tests

* It's all changed

* rename
This commit is contained in:
Joakim Sørensen 2020-11-12 22:57:28 +01:00 committed by GitHub
parent cd34a40dd8
commit a18b706f99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 474 additions and 33 deletions

View File

@ -41,6 +41,7 @@ setup(
"supervisor.docker",
"supervisor.homeassistant",
"supervisor.host",
"supervisor.job",
"supervisor.misc",
"supervisor.plugins",
"supervisor.resolution.evaluations",

View File

@ -17,6 +17,7 @@ from ..const import (
ATTR_ENABLED,
ATTR_FREQUENCY,
ATTR_GATEWAY,
ATTR_HOST_CONNECTIVITY,
ATTR_INTERFACE,
ATTR_INTERFACES,
ATTR_IPV4,
@ -29,6 +30,7 @@ from ..const import (
ATTR_PSK,
ATTR_SIGNAL,
ATTR_SSID,
ATTR_SUPERVISOR_INTERNET,
ATTR_TYPE,
ATTR_VLAN,
ATTR_WIFI,
@ -159,6 +161,8 @@ class APINetwork(CoreSysAttributes):
ATTR_GATEWAY: str(self.sys_docker.network.gateway),
ATTR_DNS: str(self.sys_docker.network.dns),
},
ATTR_HOST_CONNECTIVITY: self.sys_host.network.connectivity,
ATTR_SUPERVISOR_INTERNET: self.sys_supervisor.connectivity,
}
@api_process

View File

@ -10,6 +10,8 @@ import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from supervisor.job import JobManager
from .addons import AddonManager
from .api import RestAPI
from .arch import CpuArch
@ -55,6 +57,7 @@ async def initialize_coresys() -> CoreSys:
# Initialize core objects
coresys.resolution = ResolutionManager(coresys)
coresys.jobs = JobManager(coresys)
coresys.core = Core(coresys)
coresys.plugins = PluginManager(coresys)
coresys.arch = CpuArch(coresys)

View File

@ -151,6 +151,7 @@ ATTR_HEALTHY = "healthy"
ATTR_HOMEASSISTANT = "homeassistant"
ATTR_HOMEASSISTANT_API = "homeassistant_api"
ATTR_HOST = "host"
ATTR_HOST_CONNECTIVITY = "host_connectivity"
ATTR_HOST_DBUS = "host_dbus"
ATTR_HOST_IPC = "host_ipc"
ATTR_HOST_NETWORK = "host_network"
@ -247,6 +248,7 @@ ATTR_STDIN = "stdin"
ATTR_STORAGE = "storage"
ATTR_SUGGESTIONS = "suggestions"
ATTR_SUPERVISOR = "supervisor"
ATTR_SUPERVISOR_INTERNET = "supervisor_internet"
ATTR_SUPPORTED = "supported"
ATTR_SUPPORTED_ARCH = "supported_arch"
ATTR_SYSTEM = "system"

View File

@ -6,6 +6,8 @@ from typing import Awaitable, List, Optional
import async_timeout
from supervisor.host.const import ConnectivityState
from .const import RUN_SUPERVISOR_STATE, AddonStartup, CoreState
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
@ -57,6 +59,13 @@ class Core(CoreSysAttributes):
# Load information from container
await self.sys_supervisor.load()
# Check internet on startup
if not self.sys_host.network.connectivity == ConnectivityState.FULL:
await self.sys_host.network.check_connectivity()
if not self.sys_supervisor.connectivity:
await self.sys_supervisor.check_connectivity()
# Evaluate the system
await self.sys_resolution.evaluate.evaluate_system()

View File

@ -22,19 +22,20 @@ if TYPE_CHECKING:
from .dbus import DBusManager
from .discovery import Discovery
from .hassos import HassOS
from .misc.scheduler import Scheduler
from .misc.hwmon import HwMonitor
from .misc.tasks import Tasks
from .homeassistant import HomeAssistant
from .host import HostManager
from .ingress import Ingress
from .services import ServiceManager
from .snapshots import SnapshotManager
from .supervisor import Supervisor
from .store import StoreManager
from .updater import Updater
from .job import JobManager
from .misc.hwmon import HwMonitor
from .misc.scheduler import Scheduler
from .misc.tasks import Tasks
from .plugins import PluginManager
from .resolution import ResolutionManager
from .services import ServiceManager
from .snapshots import SnapshotManager
from .store import StoreManager
from .supervisor import Supervisor
from .updater import Updater
T = TypeVar("T")
@ -83,6 +84,7 @@ class CoreSys:
self._hwmonitor: Optional[HwMonitor] = None
self._plugins: Optional[PluginManager] = None
self._resolution: Optional[ResolutionManager] = None
self._jobs: Optional[JobManager] = None
@property
def dev(self) -> bool:
@ -413,6 +415,20 @@ class CoreSys:
raise RuntimeError("resolution manager already set!")
self._resolution = value
@property
def jobs(self) -> JobManager:
"""Return resolution manager object."""
if self._jobs is None:
raise RuntimeError("job manager not set!")
return self._jobs
@jobs.setter
def jobs(self, value: JobManager) -> None:
"""Set a resolution manager object."""
if self._jobs:
raise RuntimeError("job manager already set!")
self._jobs = value
@property
def machine(self) -> Optional[str]:
"""Return machine type string."""
@ -588,6 +604,11 @@ class CoreSysAttributes:
"""Return Resolution manager object."""
return self.coresys.resolution
@property
def sys_jobs(self) -> JobManager:
"""Return Job manager object."""
return self.coresys.jobs
def sys_run_in_executor(
self, funct: Callable[..., T], *args: Any
) -> Coroutine[Any, Any, T]:

View File

@ -65,6 +65,11 @@ class NetworkManager(DBusInterface):
settings, device_object, DBUS_OBJECT_BASE
)
@dbus_connected
async def check_connectivity(self) -> Awaitable[Any]:
"""Check the connectivity of the host."""
return await self.dbus.CheckConnectivity()
async def connect(self) -> None:
"""Connect to system's D-Bus."""
try:

View File

@ -293,3 +293,10 @@ class ResolutionError(HassioError):
class ResolutionNotFound(ResolutionError):
"""Raise if suggestion/issue was not found."""
# Job
class JobException(HassioError):
"""Base job exception."""

View File

@ -33,3 +33,13 @@ class WifiMode(str, Enum):
MESH = "mesh"
ADHOC = "adhoc"
AP = "ap"
class ConnectivityState(int, Enum):
"""Connectivity State."""
UNKNOWN = 0
NONE = 1
PORTAL = 2
LIMITED = 3
FULL = 4

View File

@ -27,7 +27,13 @@ from ..exceptions import (
HostNetworkNotFound,
HostNotSupportedError,
)
from .const import AuthMethod, InterfaceMethod, InterfaceType, WifiMode
from .const import (
AuthMethod,
ConnectivityState,
InterfaceMethod,
InterfaceType,
WifiMode,
)
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -38,6 +44,12 @@ class NetworkManager(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize system center handling."""
self.coresys: CoreSys = coresys
self._connectivity = ConnectivityState.FULL
@property
def connectivity(self) -> ConnectivityState:
"""Return true current connectivity state."""
return self._connectivity
@property
def interfaces(self) -> List[Interface]:
@ -60,6 +72,14 @@ class NetworkManager(CoreSysAttributes):
return list(dict.fromkeys(servers))
async def check_connectivity(self):
"""Check the internet connection."""
try:
state = await self.sys_dbus.network.check_connectivity()
self._connectivity = ConnectivityState(state[0])
except (DBusError, IndexError):
self._connectivity = ConnectivityState.UNKNOWN
def get(self, inet_name: str) -> Interface:
"""Return interface from interface name."""
if inet_name not in self.sys_dbus.network.interfaces:

View File

@ -0,0 +1,76 @@
"""Supervisor job manager."""
import logging
from typing import Dict, List, Optional
from ..coresys import CoreSys, CoreSysAttributes
_LOGGER: logging.Logger = logging.getLogger(__package__)
class SupervisorJob(CoreSysAttributes):
"""Supervisor running job class."""
def __init__(self, coresys: CoreSys, name: str):
"""Initialize the JobManager class."""
self.coresys: CoreSys = coresys
self.name: str = name
self._progress: int = 0
self._stage: Optional[str] = None
@property
def progress(self) -> int:
"""Return the current progress."""
return self._progress
@property
def stage(self) -> Optional[str]:
"""Return the current stage."""
return self._stage
async def update(
self, progress: Optional[int] = None, stage: Optional[str] = None
) -> None:
"""Update the job object."""
if progress is not None:
if progress >= round(100):
self.sys_jobs.remove_job(self)
return
self._progress = round(progress)
if stage is not None:
self._stage = stage
_LOGGER.debug(
"Job updated; name: %s, progress: %s, stage: %s",
self.name,
self.progress,
self.stage,
)
class JobManager(CoreSysAttributes):
"""Job class."""
def __init__(self, coresys: CoreSys):
"""Initialize the JobManager class."""
self.coresys: CoreSys = coresys
self._jobs: Dict[str, SupervisorJob] = {}
@property
def jobs(self) -> List[SupervisorJob]:
"""Return a list of current jobs."""
return self._jobs
def get_job(self, name: str) -> SupervisorJob:
"""Return a job, create one if it does not exsist."""
if name not in self._jobs:
self._jobs[name] = SupervisorJob(self.coresys, name)
return self._jobs[name]
def remove_job(self, job: SupervisorJob) -> None:
"""Remove a job."""
if job.name in self._jobs:
del self._jobs[job.name]
def clear(self) -> None:
"""Clear all jobs."""
self._jobs.clear()

110
supervisor/job/decorator.py Normal file
View File

@ -0,0 +1,110 @@
"""Job decorator."""
from enum import Enum
import logging
from typing import List, Optional
from ..const import CoreState
from ..coresys import CoreSys
from ..exceptions import HassioError, JobException
from ..host.const import ConnectivityState
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
_LOGGER: logging.Logger = logging.getLogger(__package__)
class JobCondition(str, Enum):
"""Job condition enum."""
FREE_SPACE = "free_space"
HEALTHY = "healthy"
INTERNET = "internet"
class Job:
"""Supervisor job decorator."""
def __init__(
self,
name: Optional[str] = None,
conditions: Optional[List[JobCondition]] = None,
cleanup: bool = True,
):
"""Initialize the Job class."""
self.name = name
self.conditions = conditions
self.cleanup = cleanup
self._coresys: Optional[CoreSys] = None
self._method = None
def __call__(self, method):
"""Call the wrapper logic."""
self._method = method
async def wrapper(*args, **kwargs):
"""Wrap the method."""
if self.name is None:
self.name = str(self._method.__qualname__).lower().replace(".", "_")
try:
self._coresys = args[0].coresys
except AttributeError:
return False
if not self._coresys:
raise JobException(f"coresys is missing on {self.name}")
job = self._coresys.jobs.get_job(self.name)
if self.conditions and not await self._check_conditions():
return False
try:
result = await self._method(*args, **kwargs)
except HassioError as err:
_LOGGER.error(err)
raise JobException() from err
finally:
if self.cleanup:
self._coresys.jobs.remove_job(job)
return result
return wrapper
async def _check_conditions(self):
"""Check conditions."""
if JobCondition.HEALTHY in self.conditions:
if not self._coresys.core.healthy:
_LOGGER.warning(
"'%s' blocked from execution, system is not healthy",
self._method.__qualname__,
)
return False
if JobCondition.FREE_SPACE in self.conditions:
free_space = self._coresys.host.info.free_space
if free_space < MINIMUM_FREE_SPACE_THRESHOLD:
_LOGGER.warning(
"'%s' blocked from execution, not enough free space (%sGB) left on the device",
self._method.__qualname__,
free_space,
)
self._coresys.resolution.create_issue(
IssueType.FREE_SPACE, ContextType.SYSTEM
)
return False
if JobCondition.INTERNET in self.conditions:
if self._coresys.core.state == CoreState.RUNNING:
await self._coresys.host.network.check_connectivity()
await self._coresys.supervisor.check_connectivity()
if (
not self._coresys.supervisor.connectivity
or self._coresys.host.network.connectivity != ConnectivityState.FULL
):
_LOGGER.warning(
"'%s' blocked from execution, no internet connection",
self._method.__qualname__,
)
return False
return True

View File

@ -12,7 +12,7 @@ from ..exceptions import (
MulticastError,
ObserverError,
)
from ..resolution.const import MINIMUM_FREE_SPACE_THRESHOLD, ContextType, IssueType
from ..job.decorator import Job, JobCondition
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -113,6 +113,7 @@ class Tasks(CoreSysAttributes):
_LOGGER.info("All core tasks are scheduled")
@Job(conditions=[JobCondition.HEALTHY, JobCondition.FREE_SPACE])
async def _update_addons(self):
"""Check if an update is available for an Add-on and update it."""
for addon in self.sys_addons.all:
@ -128,17 +129,6 @@ class Tasks(CoreSysAttributes):
)
continue
# Check free space
if self.sys_host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD:
_LOGGER.warning(
"Not enough free space, pausing add-on updates - available space %f",
self.sys_host.info.free_space,
)
self.sys_resolution.create_issue(
IssueType.FREE_SPACE, ContextType.SYSTEM
)
return
# Run Add-on update sequential
# avoid issue on slow IO
_LOGGER.info("Add-on auto update process %s", addon.slug)
@ -147,20 +137,12 @@ class Tasks(CoreSysAttributes):
except AddonsError:
_LOGGER.error("Can't auto update Add-on %s", addon.slug)
@Job(conditions=[JobCondition.HEALTHY, JobCondition.FREE_SPACE])
async def _update_supervisor(self):
"""Check and run update of Supervisor Supervisor."""
if not self.sys_supervisor.need_update:
return
# Check free space
if self.sys_host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD:
_LOGGER.warning(
"Not enough free space, pausing supervisor update - available space %s",
self.sys_host.info.free_space,
)
self.sys_resolution.create_issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
return
_LOGGER.info(
"Found new Supervisor version %s, updating",
self.sys_supervisor.latest_version,

View File

@ -12,6 +12,7 @@ from supervisor.utils.json import read_json_file
from ..const import REPOSITORY_CORE, REPOSITORY_LOCAL
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import JsonFileError
from ..job.decorator import Job, JobCondition
from .addon import AddonStore
from .data import StoreData
from .repository import Repository
@ -52,17 +53,22 @@ class StoreManager(CoreSysAttributes):
await asyncio.wait(tasks)
# read data from repositories
self.data.update()
await self.load()
self._read_addons()
@Job(conditions=[JobCondition.INTERNET, JobCondition.HEALTHY])
async def update_repositories(self, list_repositories):
"""Add a new custom repository."""
job = self.sys_jobs.get_job("storemanager_update_repositories")
new_rep = set(list_repositories)
old_rep = set(self.repositories)
# add new repository
async def _add_repository(url):
async def _add_repository(url: str, step: int):
"""Add a repository."""
await job.update(
progress=job.progress + step, stage=f"Checking {url} started"
)
repository = Repository(self.coresys, url)
if not await repository.load():
_LOGGER.error("Can't load data from repository %s", url)
@ -85,7 +91,9 @@ class StoreManager(CoreSysAttributes):
self.repositories[url] = repository
tasks = [_add_repository(url) for url in new_rep - old_rep]
await job.update(progress=10, stage="Check repositories")
repos = new_rep - old_rep
tasks = [_add_repository(url, 80 / len(repos)) for url in repos]
if tasks:
await asyncio.wait(tasks)
@ -95,9 +103,14 @@ class StoreManager(CoreSysAttributes):
self.sys_config.drop_addon_repository(url)
# update data
await job.update(progress=90, stage="Update addons")
self.data.update()
await job.update(progress=95, stage="Read addons")
self._read_addons()
await job.update(progress=100)
def _read_addons(self) -> None:
"""Reload add-ons inside store."""
all_addons = set(self.data.addons)

View File

@ -8,6 +8,7 @@ from tempfile import TemporaryDirectory
from typing import Awaitable, Optional
import aiohttp
from aiohttp.client_exceptions import ClientError
from packaging.version import parse as pkg_parse
from .const import SUPERVISOR_VERSION, URL_HASSIO_APPARMOR
@ -32,6 +33,7 @@ class Supervisor(CoreSysAttributes):
"""Initialize hass object."""
self.coresys: CoreSys = coresys
self.instance: DockerSupervisor = DockerSupervisor(coresys)
self._connectivity: bool = False
async def load(self) -> None:
"""Prepare Home Assistant object."""
@ -43,6 +45,11 @@ class Supervisor(CoreSysAttributes):
with suppress(DockerError):
await self.instance.cleanup()
@property
def connectivity(self) -> bool:
"""Return true if we are connected to the internet."""
return self._connectivity
@property
def ip_address(self) -> IPv4Address:
"""Return IP of Supervisor instance."""
@ -166,3 +173,14 @@ class Supervisor(CoreSysAttributes):
await self.instance.retag()
except DockerError:
_LOGGER.error("Repair of Supervisor failed")
async def check_connectivity(self):
"""Check the connection."""
try:
await self.sys_websession.head(
"https://version.home-assistant.io/online.txt", timeout=10
)
except (ClientError, asyncio.TimeoutError):
self._connectivity = False
else:
self._connectivity = True

View File

@ -25,6 +25,7 @@ from .const import (
)
from .coresys import CoreSysAttributes
from .exceptions import HassioUpdaterError
from .job.decorator import Job, JobCondition
from .utils import AsyncThrottle
from .utils.json import JsonConfig
from .validate import SCHEMA_UPDATER_CONFIG
@ -158,6 +159,7 @@ class Updater(JsonConfig, CoreSysAttributes):
self._data[ATTR_CHANNEL] = value
@AsyncThrottle(timedelta(seconds=30))
@Job(conditions=[JobCondition.HEALTHY])
async def fetch_data(self):
"""Fetch current versions from Github.

View File

@ -13,6 +13,7 @@ from supervisor.bootstrap import initialize_coresys
from supervisor.coresys import CoreSys
from supervisor.dbus.network import NetworkManager
from supervisor.docker import DockerAPI
from supervisor.host.const import ConnectivityState
from supervisor.utils.gdbus import DBus
from tests.common import exists_fixture, load_fixture, load_json_fixture
@ -136,6 +137,10 @@ async def coresys(loop, docker, network_manager, aiohttp_client) -> CoreSys:
# Mock docker
coresys_obj._docker = docker
# Set internet state
coresys_obj.supervisor._connectivity = True
coresys_obj.host.network._connectivity = ConnectivityState.FULL
yield coresys_obj

View File

@ -0,0 +1,41 @@
"""Test supported features."""
# pylint: disable=protected-access
from unittest.mock import patch
from supervisor.coresys import CoreSys
from supervisor.host.const import ConnectivityState
async def test_connectivity_unknown(coresys: CoreSys):
"""Test host unknown connectivity."""
with patch("supervisor.utils.gdbus.DBus._send", return_value="[0]"):
await coresys.host.network.check_connectivity()
assert coresys.host.network.connectivity == ConnectivityState.UNKNOWN
async def test_connectivity_none(coresys: CoreSys):
"""Test host none connectivity."""
with patch("supervisor.utils.gdbus.DBus._send", return_value="[1]"):
await coresys.host.network.check_connectivity()
assert coresys.host.network.connectivity == ConnectivityState.NONE
async def test_connectivity_portal(coresys: CoreSys):
"""Test host portal connectivity."""
with patch("supervisor.utils.gdbus.DBus._send", return_value="[2]"):
await coresys.host.network.check_connectivity()
assert coresys.host.network.connectivity == ConnectivityState.PORTAL
async def test_connectivity_limited(coresys: CoreSys):
"""Test host limited connectivity."""
with patch("supervisor.utils.gdbus.DBus._send", return_value="[3]"):
await coresys.host.network.check_connectivity()
assert coresys.host.network.connectivity == ConnectivityState.LIMITED
async def test_connectivity_full(coresys: CoreSys):
"""Test host full connectivity."""
with patch("supervisor.utils.gdbus.DBus._send", return_value="[4]"):
await coresys.host.network.check_connectivity()
assert coresys.host.network.connectivity == ConnectivityState.FULL

View File

@ -0,0 +1,73 @@
"""Test the condition decorators."""
# pylint: disable=protected-access,import-error
from unittest.mock import patch
from supervisor.coresys import CoreSys
from supervisor.job.decorator import Job, JobCondition
async def test_healthy(coresys: CoreSys):
"""Test the healty decorator."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.HEALTHY])
async def execute(self):
"""Execute the class method."""
return True
test = TestClass(coresys)
assert await test.execute()
coresys.core.healthy = False
assert not await test.execute()
async def test_internet(coresys: CoreSys):
"""Test the internet decorator."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.INTERNET])
async def execute(self):
"""Execute the class method."""
return True
test = TestClass(coresys)
assert await test.execute()
coresys.supervisor._connectivity = False
assert not await test.execute()
async def test_free_space(coresys: CoreSys):
"""Test the free_space decorator."""
class TestClass:
"""Test class."""
def __init__(self, coresys: CoreSys):
"""Initialize the test class."""
self.coresys = coresys
@Job(conditions=[JobCondition.FREE_SPACE])
async def execute(self):
"""Execute the class method."""
return True
test = TestClass(coresys)
with patch("shutil.disk_usage", return_value=(42, 42, (1024.0 ** 3))):
assert await test.execute()
with patch("shutil.disk_usage", return_value=(42, 42, (512.0 ** 3))):
assert not await test.execute()

View File

@ -0,0 +1,39 @@
"""Test the condition decorators."""
# pylint: disable=protected-access,import-error
from supervisor.coresys import CoreSys
TEST_JOB = "test"
async def test_add_job(coresys: CoreSys):
"""Test adding jobs."""
job = coresys.jobs.get_job(TEST_JOB)
assert job.name in coresys.jobs.jobs
async def test_remove_job_directly(coresys: CoreSys):
"""Test removing jobs from manager."""
job = coresys.jobs.get_job(TEST_JOB)
assert job.name in coresys.jobs.jobs
coresys.jobs.remove_job(job)
assert job.name not in coresys.jobs.jobs
async def test_remove_job_with_progress(coresys: CoreSys):
"""Test removing jobs by setting progress to 100."""
job = coresys.jobs.get_job(TEST_JOB)
assert job.name in coresys.jobs.jobs
await job.update(progress=100)
assert job.name not in coresys.jobs.jobs
async def test_update_job(coresys: CoreSys):
"""Test updating jobs."""
job = coresys.jobs.get_job(TEST_JOB)
await job.update(progress=50, stage="stage")
assert job.progress == 50
assert job.stage == "stage"