2017-06-17 22:47:56 +02:00

400 lines
13 KiB
Python

"""Init file for HassIO addons."""
import copy
import logging
import json
from pathlib import Path, PurePath
import re
import voluptuous as vol
from voluptuous.humanize import humanize_error
from .util import extract_hash_from_path
from .validate import (
validate_options, SCHEMA_ADDON_CONFIG, SCHEMA_REPOSITORY_CONFIG,
MAP_VOLUME)
from ..const import (
FILE_HASSIO_ADDONS, ATTR_NAME, ATTR_VERSION, ATTR_SLUG, ATTR_DESCRIPTON,
ATTR_STARTUP, ATTR_BOOT, ATTR_MAP, ATTR_OPTIONS, ATTR_PORTS, BOOT_AUTO,
ATTR_SCHEMA, ATTR_IMAGE, ATTR_REPOSITORY, ATTR_URL, ATTR_ARCH,
ATTR_LOCATON, ATTR_DEVICES, ATTR_ENVIRONMENT, ATTR_HOST_NETWORK,
ATTR_TMPFS, ATTR_PRIVILEGED)
from ..config import Config
from ..tools import read_json_file, write_json_file
_LOGGER = logging.getLogger(__name__)
SYSTEM = 'system'
USER = 'user'
REPOSITORY_CORE = 'core'
REPOSITORY_LOCAL = 'local'
RE_VOLUME = re.compile(MAP_VOLUME)
class AddonsData(Config):
"""Hold data for addons inside HassIO."""
def __init__(self, config):
"""Initialize data holder."""
super().__init__(FILE_HASSIO_ADDONS)
self.config = config
self._system_data = self._data.get(SYSTEM, {})
self._user_data = self._data.get(USER, {})
self._addons_cache = {}
self._repositories_data = {}
self.arch = None
def save(self):
"""Store data to config file."""
self._data = {
USER: self._user_data,
SYSTEM: self._system_data,
}
super().save()
def read_data_from_repositories(self):
"""Read data from addons repository."""
self._addons_cache = {}
self._repositories_data = {}
# read core repository
self._read_addons_folder(
self.config.path_addons_core, REPOSITORY_CORE)
# read local repository
self._read_addons_folder(
self.config.path_addons_local, REPOSITORY_LOCAL)
# add built-in repositories information
self._set_builtin_repositories()
# read custom git repositories
for repository_element in self.config.path_addons_git.iterdir():
if repository_element.is_dir():
self._read_git_repository(repository_element)
def _read_git_repository(self, path):
"""Process a custom repository folder."""
slug = extract_hash_from_path(path)
repository_info = {ATTR_SLUG: slug}
# exists repository json
repository_file = Path(path, "repository.json")
try:
repository_info.update(SCHEMA_REPOSITORY_CONFIG(
read_json_file(repository_file)
))
except OSError:
_LOGGER.warning("Can't read repository information from %s",
repository_file)
return
except vol.Invalid:
_LOGGER.warning("Repository parse error %s", repository_file)
return
# process data
self._repositories_data[slug] = repository_info
self._read_addons_folder(path, slug)
def _read_addons_folder(self, path, repository):
"""Read data from addons folder."""
for addon in path.glob("**/config.json"):
try:
addon_config = read_json_file(addon)
# validate
addon_config = SCHEMA_ADDON_CONFIG(addon_config)
# Generate slug
addon_slug = "{}_{}".format(
repository, addon_config[ATTR_SLUG])
# store
addon_config[ATTR_REPOSITORY] = repository
addon_config[ATTR_LOCATON] = str(addon.parent)
self._addons_cache[addon_slug] = addon_config
except OSError:
_LOGGER.warning("Can't read %s", addon)
except vol.Invalid as ex:
_LOGGER.warning("Can't read %s -> %s", addon,
humanize_error(addon_config, ex))
def _set_builtin_repositories(self):
"""Add local built-in repository into dataset."""
try:
builtin_file = Path(__file__).parent.joinpath('built-in.json')
builtin_data = read_json_file(builtin_file)
except (OSError, json.JSONDecodeError) as err:
_LOGGER.warning("Can't read built-in.json -> %s", err)
return
# if core addons are available
for data in self._addons_cache.values():
if data[ATTR_REPOSITORY] == REPOSITORY_CORE:
self._repositories_data[REPOSITORY_CORE] = \
builtin_data[REPOSITORY_CORE]
break
# if local addons are available
for data in self._addons_cache.values():
if data[ATTR_REPOSITORY] == REPOSITORY_LOCAL:
self._repositories_data[REPOSITORY_LOCAL] = \
builtin_data[REPOSITORY_LOCAL]
break
def merge_update_config(self):
"""Update local config if they have update.
It need to be the same version as the local version is.
"""
have_change = False
for addon in self.list_installed:
# detached
if addon not in self._addons_cache:
continue
cache = self._addons_cache[addon]
data = self._system_data[addon]
if data[ATTR_VERSION] == cache[ATTR_VERSION]:
if data != cache:
self._system_data[addon] = copy.deepcopy(cache)
have_change = True
if have_change:
self.save()
@property
def list_installed(self):
"""Return a list of installed addons."""
return set(self._system_data)
@property
def list_all(self):
"""Return a dict of all addons."""
return set(self._system_data) | set(self._addons_cache)
def list_startup(self, start_type):
"""Get list of installed addon with need start by type."""
addon_list = set()
for addon in self._system_data.keys():
if self.get_boot(addon) != BOOT_AUTO:
continue
try:
if self._system_data[addon][ATTR_STARTUP] == start_type:
addon_list.add(addon)
except KeyError:
_LOGGER.warning("Orphaned addon detect %s", addon)
continue
return addon_list
@property
def list_detached(self):
"""Return local addons they not support from repo."""
addon_list = set()
for addon in self._system_data.keys():
if addon not in self._addons_cache:
addon_list.add(addon)
return addon_list
@property
def list_repositories(self):
"""Return list of addon repositories."""
return list(self._repositories_data.values())
def exists_addon(self, addon):
"""Return True if a addon exists."""
return addon in self._addons_cache or addon in self._system_data
def is_installed(self, addon):
"""Return True if a addon is installed."""
return addon in self._system_data
def version_installed(self, addon):
"""Return installed version."""
return self._user_data.get(addon, {}).get(ATTR_VERSION)
def set_addon_install(self, addon, version):
"""Set addon as installed."""
self._system_data[addon] = copy.deepcopy(self._addons_cache[addon])
self._user_data[addon] = {
ATTR_OPTIONS: {},
ATTR_VERSION: version,
}
self.save()
def set_addon_uninstall(self, addon):
"""Set addon as uninstalled."""
self._system_data.pop(addon, None)
self._user_data.pop(addon, None)
self.save()
def set_addon_update(self, addon, version):
"""Update version of addon."""
self._system_data[addon] = copy.deepcopy(self._addons_cache[addon])
self._user_data[addon][ATTR_VERSION] = version
self.save()
def set_options(self, addon, options):
"""Store user addon options."""
self._user_data[addon][ATTR_OPTIONS] = copy.deepcopy(options)
self.save()
def set_boot(self, addon, boot):
"""Store user boot options."""
self._user_data[addon][ATTR_BOOT] = boot
self.save()
def get_options(self, addon):
"""Return options with local changes."""
return {
**self._system_data[addon][ATTR_OPTIONS],
**self._user_data[addon][ATTR_OPTIONS],
}
def get_boot(self, addon):
"""Return boot config with prio local settings."""
if ATTR_BOOT in self._user_data[addon]:
return self._user_data[addon][ATTR_BOOT]
return self._system_data[addon][ATTR_BOOT]
def get_name(self, addon):
"""Return name of addon."""
if addon in self._addons_cache:
return self._addons_cache[addon][ATTR_NAME]
return self._system_data[addon][ATTR_NAME]
def get_description(self, addon):
"""Return description of addon."""
if addon in self._addons_cache:
return self._addons_cache[addon][ATTR_DESCRIPTON]
return self._system_data[addon][ATTR_DESCRIPTON]
def get_repository(self, addon):
"""Return repository of addon."""
if addon in self._addons_cache:
return self._addons_cache[addon][ATTR_REPOSITORY]
return self._system_data[addon][ATTR_REPOSITORY]
def get_last_version(self, addon):
"""Return version of addon."""
if addon in self._addons_cache:
return self._addons_cache[addon][ATTR_VERSION]
return self.version_installed(addon)
def get_ports(self, addon):
"""Return ports of addon."""
return self._system_data[addon].get(ATTR_PORTS)
def get_network_mode(self, addon):
"""Return network mode of addon."""
if self._system_data[addon][ATTR_HOST_NETWORK]:
return 'host'
return 'bridge'
def get_devices(self, addon):
"""Return devices of addon."""
return self._system_data[addon].get(ATTR_DEVICES)
def get_tmpfs(self, addon):
"""Return tmpfs of addon."""
return self._system_data[addon].get(ATTR_TMPFS)
def get_environment(self, addon):
"""Return environment of addon."""
return self._system_data[addon].get(ATTR_ENVIRONMENT)
def get_privileged(self, addon):
"""Return list of privilege."""
return self._system_data[addon].get(ATTR_PRIVILEGED)
def get_url(self, addon):
"""Return url of addon."""
if addon in self._addons_cache:
return self._addons_cache[addon].get(ATTR_URL)
return self._system_data[addon].get(ATTR_URL)
def get_arch(self, addon):
"""Return list of supported arch."""
if addon in self._addons_cache:
return self._addons_cache[addon][ATTR_ARCH]
return self._system_data[addon][ATTR_ARCH]
def get_image(self, addon):
"""Return image name of addon."""
addon_data = self._system_data.get(
addon, self._addons_cache.get(addon)
)
# Repository with dockerhub images
if ATTR_IMAGE in addon_data:
return addon_data[ATTR_IMAGE].format(arch=self.arch)
# local build
return "{}/{}-addon-{}".format(
addon_data[ATTR_REPOSITORY], self.arch, addon_data[ATTR_SLUG])
def need_build(self, addon):
"""Return True if this addon need a local build."""
addon_data = self._system_data.get(
addon, self._addons_cache.get(addon)
)
return ATTR_IMAGE not in addon_data
def map_volumes(self, addon):
"""Return a dict of {volume: policy} from addon."""
volumes = {}
for volume in self._system_data[addon][ATTR_MAP]:
result = RE_VOLUME.match(volume)
volumes[result.group(1)] = result.group(2) or 'ro'
return volumes
def path_data(self, addon):
"""Return addon data path inside supervisor."""
return Path(self.config.path_addons_data, addon)
def path_extern_data(self, addon):
"""Return addon data path external for docker."""
return PurePath(self.config.path_extern_addons_data, addon)
def path_addon_options(self, addon):
"""Return path to addons options."""
return Path(self.path_data(addon), "options.json")
def path_addon_location(self, addon):
"""Return path to this addon."""
return Path(self._addons_cache[addon][ATTR_LOCATON])
def write_addon_options(self, addon):
"""Return True if addon options is written to data."""
schema = self.get_schema(addon)
options = self.get_options(addon)
try:
schema(options)
return write_json_file(self.path_addon_options(addon), options)
except vol.Invalid as ex:
_LOGGER.error("Addon %s have wrong options -> %s", addon,
humanize_error(options, ex))
return False
def get_schema(self, addon):
"""Create a schema for addon options."""
raw_schema = self._system_data[addon][ATTR_SCHEMA]
if isinstance(raw_schema, bool):
return vol.Schema(dict)
return vol.Schema(vol.All(dict, validate_options(raw_schema)))