Files
supervisor/hassio/coresys.py
Pascal Vizeli 35aae69f23 Support armv7 and allow support of multible arch types per CPU (#892)
* Support armv7 and first abstraction

* Change layout

* Add more type hints

* Fix imports

* Update

* move forward

* add tests

* fix type

* fix lint & tests

* fix tests

* Fix unittests

* Fix create folder

* cleanup

* Fix import order

* cleanup loop parameter

* cleanup init function

* Allow changeable image name

* fix setup

* Fix load of arch

* Fix lint

* Add typing

* fix init

* fix hassos cli problem & stick on supervisor arch

* address comments

* cleanup

* Fix image selfheal

* Add comment

* update uvloop

* remove uvloop

* fix tagging

* Fix install name

* Fix validate build config

* Abstract image_name from system cache
2019-01-31 18:47:44 +01:00

456 lines
12 KiB
Python

"""Handle core shared data."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import aiohttp
from .config import CoreConfig
from .const import CHANNEL_DEV
from .docker import DockerAPI
from .misc.dns import DNSForward
from .misc.hardware import Hardware
from .misc.scheduler import Scheduler
if TYPE_CHECKING:
from .addons import AddonManager
from .api import RestAPI
from .arch import CpuArch
from .auth import Auth
from .core import HassIO
from .dbus import DBusManager
from .discovery import Discovery
from .hassos import HassOS
from .homeassistant import HomeAssistant
from .host import HostManager
from .services import ServiceManager
from .snapshots import SnapshotManager
from .supervisor import Supervisor
from .tasks import Tasks
from .updater import Updater
class CoreSys:
"""Class that handle all shared data."""
def __init__(self):
"""Initialize coresys."""
# Static attributes
self.machine_id: str = None
# External objects
self._loop: asyncio.BaseEventLoop = asyncio.get_running_loop()
self._websession: aiohttp.ClientSession = aiohttp.ClientSession()
self._websession_ssl: aiohttp.ClientSession = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False))
# Global objects
self._config: CoreConfig = CoreConfig()
self._hardware: Hardware = Hardware()
self._docker: DockerAPI = DockerAPI()
self._scheduler: Scheduler = Scheduler()
self._dns: DNSForward = DNSForward()
# Internal objects pointers
self._core: HassIO = None
self._arch: CpuArch = None
self._auth: Auth = None
self._homeassistant: HomeAssistant = None
self._supervisor: Supervisor = None
self._addons: AddonManager = None
self._api: RestAPI = None
self._updater: Updater = None
self._snapshots: SnapshotManager = None
self._tasks: Tasks = None
self._host: HostManager = None
self._dbus: DBusManager = None
self._hassos: HassOS = None
self._services: ServiceManager = None
self._discovery: Discovery = None
@property
def machine(self) -> str:
"""Return running machine type of the Hass.io system."""
if self._homeassistant:
return self._homeassistant.machine
return None
@property
def dev(self) -> str:
"""Return True if we run dev mode."""
return self._updater.channel == CHANNEL_DEV
@property
def timezone(self) -> str:
"""Return timezone."""
return self._config.timezone
@property
def loop(self) -> asyncio.BaseEventLoop:
"""Return loop object."""
return self._loop
@property
def websession(self) -> aiohttp.ClientSession:
"""Return websession object."""
return self._websession
@property
def websession_ssl(self) -> aiohttp.ClientSession:
"""Return websession object with disabled SSL."""
return self._websession_ssl
@property
def config(self) -> CoreConfig:
"""Return CoreConfig object."""
return self._config
@property
def hardware(self) -> Hardware:
"""Return Hardware object."""
return self._hardware
@property
def docker(self) -> DockerAPI:
"""Return DockerAPI object."""
return self._docker
@property
def scheduler(self) -> Scheduler:
"""Return Scheduler object."""
return self._scheduler
@property
def dns(self) -> DNSForward:
"""Return DNSForward object."""
return self._dns
@property
def core(self) -> HassIO:
"""Return HassIO object."""
return self._core
@core.setter
def core(self, value: HassIO):
"""Set a Hass.io object."""
if self._core:
raise RuntimeError("Hass.io already set!")
self._core = value
@property
def arch(self) -> CpuArch:
"""Return CpuArch object."""
return self._arch
@arch.setter
def arch(self, value: CpuArch):
"""Set a CpuArch object."""
if self._arch:
raise RuntimeError("CpuArch already set!")
self._arch = value
@property
def auth(self) -> Auth:
"""Return Auth object."""
return self._auth
@auth.setter
def auth(self, value: Auth):
"""Set a Auth object."""
if self._auth:
raise RuntimeError("Auth already set!")
self._auth = value
@property
def homeassistant(self) -> HomeAssistant:
"""Return Home Assistant object."""
return self._homeassistant
@homeassistant.setter
def homeassistant(self, value: HomeAssistant):
"""Set a HomeAssistant object."""
if self._homeassistant:
raise RuntimeError("Home Assistant already set!")
self._homeassistant = value
@property
def supervisor(self) -> Supervisor:
"""Return Supervisor object."""
return self._supervisor
@supervisor.setter
def supervisor(self, value: Supervisor):
"""Set a Supervisor object."""
if self._supervisor:
raise RuntimeError("Supervisor already set!")
self._supervisor = value
@property
def api(self) -> RestAPI:
"""Return API object."""
return self._api
@api.setter
def api(self, value: RestAPI):
"""Set an API object."""
if self._api:
raise RuntimeError("API already set!")
self._api = value
@property
def updater(self) -> Updater:
"""Return Updater object."""
return self._updater
@updater.setter
def updater(self, value: Updater):
"""Set a Updater object."""
if self._updater:
raise RuntimeError("Updater already set!")
self._updater = value
@property
def addons(self) -> AddonManager:
"""Return AddonManager object."""
return self._addons
@addons.setter
def addons(self, value: AddonManager):
"""Set a AddonManager object."""
if self._addons:
raise RuntimeError("AddonManager already set!")
self._addons = value
@property
def snapshots(self) -> SnapshotManager:
"""Return SnapshotManager object."""
return self._snapshots
@snapshots.setter
def snapshots(self, value: SnapshotManager):
"""Set a SnapshotManager object."""
if self._snapshots:
raise RuntimeError("SnapshotsManager already set!")
self._snapshots = value
@property
def tasks(self) -> Tasks:
"""Return Tasks object."""
return self._tasks
@tasks.setter
def tasks(self, value: Tasks):
"""Set a Tasks object."""
if self._tasks:
raise RuntimeError("Tasks already set!")
self._tasks = value
@property
def services(self) -> ServiceManager:
"""Return ServiceManager object."""
return self._services
@services.setter
def services(self, value: ServiceManager):
"""Set a ServiceManager object."""
if self._services:
raise RuntimeError("Services already set!")
self._services = value
@property
def discovery(self) -> Discovery:
"""Return ServiceManager object."""
return self._discovery
@discovery.setter
def discovery(self, value: Discovery):
"""Set a Discovery object."""
if self._discovery:
raise RuntimeError("Discovery already set!")
self._discovery = value
@property
def dbus(self) -> DBusManager:
"""Return DBusManager object."""
return self._dbus
@dbus.setter
def dbus(self, value: DBusManager):
"""Set a DBusManager object."""
if self._dbus:
raise RuntimeError("DBusManager already set!")
self._dbus = value
@property
def host(self) -> HostManager:
"""Return HostManager object."""
return self._host
@host.setter
def host(self, value: HostManager):
"""Set a HostManager object."""
if self._host:
raise RuntimeError("HostManager already set!")
self._host = value
@property
def hassos(self) -> HassOS:
"""Return HassOS object."""
return self._hassos
@hassos.setter
def hassos(self, value: HassOS):
"""Set a HassOS object."""
if self._hassos:
raise RuntimeError("HassOS already set!")
self._hassos = value
class CoreSysAttributes:
"""Inheret basic CoreSysAttributes."""
coresys = None
@property
def sys_machine(self) -> str:
"""Return running machine type of the Hass.io system."""
return self.coresys.machine
@property
def sys_dev(self) -> str:
"""Return True if we run dev mode."""
return self.coresys.dev
@property
def sys_timezone(self) -> str:
"""Return timezone."""
return self.coresys.timezone
@property
def sys_machine_id(self) -> str:
"""Return timezone."""
return self.coresys.machine_id
@property
def sys_loop(self) -> asyncio.BaseEventLoop:
"""Return loop object."""
return self.coresys.loop
@property
def sys_websession(self) -> aiohttp.ClientSession:
"""Return websession object."""
return self.coresys.websession
@property
def sys_websession_ssl(self) -> aiohttp.ClientSession:
"""Return websession object with disabled SSL."""
return self.coresys.websession_ssl
@property
def sys_config(self) -> CoreConfig:
"""Return CoreConfig object."""
return self.coresys.config
@property
def sys_hardware(self) -> Hardware:
"""Return Hardware object."""
return self.coresys.hardware
@property
def sys_docker(self) -> DockerAPI:
"""Return DockerAPI object."""
return self.coresys.docker
@property
def sys_scheduler(self) -> Scheduler:
"""Return Scheduler object."""
return self.coresys.scheduler
@property
def sys_dns(self) -> DNSForward:
"""Return DNSForward object."""
return self.coresys.dns
@property
def sys_core(self) -> HassIO:
"""Return HassIO object."""
return self.coresys.core
@property
def sys_arch(self) -> CpuArch:
"""Return CpuArch object."""
return self.coresys.arch
@property
def sys_auth(self) -> Auth:
"""Return Auth object."""
return self.coresys.auth
@property
def sys_homeassistant(self) -> HomeAssistant:
"""Return Home Assistant object."""
return self.coresys.homeassistant
@property
def sys_supervisor(self) -> Supervisor:
"""Return Supervisor object."""
return self.coresys.supervisor
@property
def sys_api(self) -> RestAPI:
"""Return API object."""
return self.coresys.api
@property
def sys_updater(self) -> Updater:
"""Return Updater object."""
return self.coresys.updater
@property
def sys_addons(self) -> AddonManager:
"""Return AddonManager object."""
return self.coresys.addons
@property
def sys_snapshots(self) -> SnapshotManager:
"""Return SnapshotManager object."""
return self.coresys.snapshots
@property
def sys_tasks(self) -> Tasks:
"""Return Tasks object."""
return self.coresys.tasks
@property
def sys_services(self) -> ServiceManager:
"""Return ServiceManager object."""
return self.coresys.services
@property
def sys_discovery(self) -> Discovery:
"""Return ServiceManager object."""
return self.coresys.discovery
@property
def sys_dbus(self) -> DBusManager:
"""Return DBusManager object."""
return self.coresys.dbus
@property
def sys_host(self) -> HostManager:
"""Return HostManager object."""
return self.coresys.host
@property
def sys_hassos(self) -> HassOS:
"""Return HassOS object."""
return self.coresys.hassos
def sys_run_in_executor(self, funct, *args) -> asyncio.Future:
"""Wrapper for executor pool."""
return self.sys_loop.run_in_executor(None, funct, *args)
def sys_create_task(self, coroutine) -> asyncio.Task:
"""Wrapper for async task."""
return self.sys_loop.create_task(coroutine)