Pascal Vizeli decf254e5f
Ingress panel support ()
* Ingress Panel support

* Fix lists

* Allow to set the value

* fix panels

* Update ha realtime

* Fix url

* Fix update
2019-04-23 11:18:04 +02:00

392 lines
12 KiB
Python

"""Validate add-ons options schema."""
import logging
import re
import secrets
import uuid
import voluptuous as vol
from ..const import (
ARCH_ALL,
ATTR_ACCESS_TOKEN,
ATTR_APPARMOR,
ATTR_ARCH,
ATTR_ARGS,
ATTR_AUDIO,
ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT,
ATTR_AUTH_API,
ATTR_AUTO_UART,
ATTR_AUTO_UPDATE,
ATTR_BOOT,
ATTR_BUILD_FROM,
ATTR_DESCRIPTON,
ATTR_DEVICES,
ATTR_DEVICETREE,
ATTR_DISCOVERY,
ATTR_DOCKER_API,
ATTR_ENVIRONMENT,
ATTR_FULL_ACCESS,
ATTR_GPIO,
ATTR_HASSIO_API,
ATTR_HASSIO_ROLE,
ATTR_HOMEASSISTANT,
ATTR_HOMEASSISTANT_API,
ATTR_HOST_DBUS,
ATTR_HOST_IPC,
ATTR_HOST_NETWORK,
ATTR_HOST_PID,
ATTR_IMAGE,
ATTR_INGRESS,
ATTR_INGRESS_ENTRY,
ATTR_INGRESS_PORT,
ATTR_INGRESS_TOKEN,
ATTR_INGRESS_PANEL,
ATTR_INGRESS_PANEL_ADMIN,
ATTR_INGRESS_PANEL_ICON,
ATTR_INGRESS_PANEL_TITLE,
ATTR_KERNEL_MODULES,
ATTR_LEGACY,
ATTR_LOCATON,
ATTR_MACHINE,
ATTR_MAINTAINER,
ATTR_MAP,
ATTR_NAME,
ATTR_NETWORK,
ATTR_OPTIONS,
ATTR_PORTS,
ATTR_PORTS_DESCRIPTION,
ATTR_PRIVILEGED,
ATTR_PROTECTED,
ATTR_REPOSITORY,
ATTR_SCHEMA,
ATTR_SERVICES,
ATTR_SLUG,
ATTR_SQUASH,
ATTR_STARTUP,
ATTR_STATE,
ATTR_STDIN,
ATTR_SYSTEM,
ATTR_TIMEOUT,
ATTR_TMPFS,
ATTR_URL,
ATTR_USER,
ATTR_UUID,
ATTR_VERSION,
ATTR_WEBUI,
BOOT_AUTO,
BOOT_MANUAL,
PRIVILEGED_ALL,
ROLE_ALL,
ROLE_DEFAULT,
STARTUP_ALL,
STARTUP_APPLICATION,
STARTUP_SERVICES,
STATE_STARTED,
STATE_STOPPED,
)
from ..discovery.validate import valid_discovery_service
from ..validate import (
ALSA_DEVICE,
DOCKER_PORTS,
DOCKER_PORTS_DESCRIPTION,
NETWORK_PORT,
TOKEN,
UUID_MATCH,
)
_LOGGER = logging.getLogger(__name__)
RE_VOLUME = re.compile(r"^(config|ssl|addons|backup|share)(?::(rw|ro))?$")
RE_SERVICE = re.compile(r"^(?P<service>mqtt):(?P<rights>provide|want|need)$")
V_STR = 'str'
V_INT = 'int'
V_FLOAT = 'float'
V_BOOL = 'bool'
V_EMAIL = 'email'
V_URL = 'url'
V_PORT = 'port'
V_MATCH = 'match'
RE_SCHEMA_ELEMENT = re.compile(
r"^(?:"
r"|str|bool|email|url|port"
r"|int(?:\((?P<i_min>\d+)?,(?P<i_max>\d+)?\))?"
r"|float(?:\((?P<f_min>[\d\.]+)?,(?P<f_max>[\d\.]+)?\))?"
r"|match\((?P<match>.*)\)"
r")\??$"
)
RE_DOCKER_IMAGE = re.compile(
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)$")
RE_DOCKER_IMAGE_BUILD = re.compile(
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$")
SCHEMA_ELEMENT = vol.Match(RE_SCHEMA_ELEMENT)
MACHINE_ALL = [
'intel-nuc', 'odroid-c2', 'odroid-xu', 'orangepi-prime', 'qemux86',
'qemux86-64', 'qemuarm', 'qemuarm-64', 'raspberrypi', 'raspberrypi2',
'raspberrypi3', 'raspberrypi3-64', 'tinker',
]
def _simple_startup(value):
"""Simple startup schema."""
if value == "before":
return STARTUP_SERVICES
if value == "after":
return STARTUP_APPLICATION
return value
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_CONFIG = vol.Schema({
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Required(ATTR_SLUG): vol.Coerce(str),
vol.Required(ATTR_DESCRIPTON): vol.Coerce(str),
vol.Required(ATTR_ARCH): [vol.In(ARCH_ALL)],
vol.Optional(ATTR_MACHINE): [vol.In(MACHINE_ALL)],
vol.Optional(ATTR_URL): vol.Url(),
vol.Required(ATTR_STARTUP):
vol.All(_simple_startup, vol.In(STARTUP_ALL)),
vol.Required(ATTR_BOOT):
vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_PORTS): DOCKER_PORTS,
vol.Optional(ATTR_PORTS_DESCRIPTION): DOCKER_PORTS_DESCRIPTION,
vol.Optional(ATTR_WEBUI):
vol.Match(r"^(?:https?|\[PROTO:\w+\]):\/\/\[HOST\]:\[PORT:\d+\].*$"),
vol.Optional(ATTR_INGRESS, default=False): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PORT, default=8099): vol.Any(NETWORK_PORT, vol.Equal(0)),
vol.Optional(ATTR_INGRESS_ENTRY): vol.Coerce(str),
vol.Optional(ATTR_INGRESS_PANEL_ICON, default="mdi:puzzle"): vol.Coerce(str),
vol.Optional(ATTR_INGRESS_PANEL_TITLE): vol.Coerce(str),
vol.Optional(ATTR_INGRESS_PANEL_ADMIN, default=True): vol.Boolean(),
vol.Optional(ATTR_HOMEASSISTANT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_PID, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_IPC, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")],
vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(),
vol.Optional(ATTR_TMPFS):
vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)],
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)],
vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(),
vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(),
vol.Optional(ATTR_AUDIO, default=False): vol.Boolean(),
vol.Optional(ATTR_GPIO, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(),
vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_ROLE, default=ROLE_DEFAULT): vol.In(ROLE_ALL),
vol.Optional(ATTR_HOMEASSISTANT_API, default=False): vol.Boolean(),
vol.Optional(ATTR_STDIN, default=False): vol.Boolean(),
vol.Optional(ATTR_LEGACY, default=False): vol.Boolean(),
vol.Optional(ATTR_DOCKER_API, default=False): vol.Boolean(),
vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(),
vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)],
vol.Optional(ATTR_DISCOVERY): [valid_discovery_service],
vol.Required(ATTR_OPTIONS): dict,
vol.Required(ATTR_SCHEMA): vol.Any(vol.Schema({
vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [
vol.Any(
SCHEMA_ELEMENT,
{vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}
),
], vol.Schema({
vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])
}))
}), False),
vol.Optional(ATTR_IMAGE):
vol.Match(RE_DOCKER_IMAGE),
vol.Optional(ATTR_TIMEOUT, default=10):
vol.All(vol.Coerce(int), vol.Range(min=10, max=120)),
}, extra=vol.REMOVE_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_REPOSITORY_CONFIG = vol.Schema({
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Optional(ATTR_URL): vol.Url(),
vol.Optional(ATTR_MAINTAINER): vol.Coerce(str),
}, extra=vol.REMOVE_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_BUILD_CONFIG = vol.Schema({
vol.Optional(ATTR_BUILD_FROM, default=dict): vol.Schema({
vol.In(ARCH_ALL): vol.Match(RE_DOCKER_IMAGE_BUILD),
}),
vol.Optional(ATTR_SQUASH, default=False): vol.Boolean(),
vol.Optional(ATTR_ARGS, default=dict): vol.Schema({
vol.Coerce(str): vol.Coerce(str)
}),
}, extra=vol.REMOVE_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_USER = vol.Schema({
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Optional(ATTR_IMAGE): vol.Coerce(str),
vol.Optional(ATTR_UUID, default=lambda: uuid.uuid4().hex): UUID_MATCH,
vol.Optional(ATTR_ACCESS_TOKEN): TOKEN,
vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): vol.Coerce(str),
vol.Optional(ATTR_OPTIONS, default=dict): dict,
vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(),
vol.Optional(ATTR_BOOT):
vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): DOCKER_PORTS,
vol.Optional(ATTR_AUDIO_OUTPUT): ALSA_DEVICE,
vol.Optional(ATTR_AUDIO_INPUT): ALSA_DEVICE,
vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(),
}, extra=vol.REMOVE_EXTRA)
SCHEMA_ADDON_SYSTEM = SCHEMA_ADDON_CONFIG.extend({
vol.Required(ATTR_LOCATON): vol.Coerce(str),
vol.Required(ATTR_REPOSITORY): vol.Coerce(str),
})
SCHEMA_ADDONS_FILE = vol.Schema({
vol.Optional(ATTR_USER, default=dict): {
vol.Coerce(str): SCHEMA_ADDON_USER,
},
vol.Optional(ATTR_SYSTEM, default=dict): {
vol.Coerce(str): SCHEMA_ADDON_SYSTEM,
}
})
SCHEMA_ADDON_SNAPSHOT = vol.Schema({
vol.Required(ATTR_USER): SCHEMA_ADDON_USER,
vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM,
vol.Required(ATTR_STATE): vol.In([STATE_STARTED, STATE_STOPPED]),
vol.Required(ATTR_VERSION): vol.Coerce(str),
}, extra=vol.REMOVE_EXTRA)
def validate_options(raw_schema):
"""Validate schema."""
def validate(struct):
"""Create schema validator for add-ons options."""
options = {}
# read options
for key, value in struct.items():
# Ignore unknown options / remove from list
if key not in raw_schema:
_LOGGER.warning("Unknown options %s", key)
continue
typ = raw_schema[key]
try:
if isinstance(typ, list):
# nested value list
options[key] = _nested_validate_list(typ[0], value, key)
elif isinstance(typ, dict):
# nested value dict
options[key] = _nested_validate_dict(typ, value, key)
else:
# normal value
options[key] = _single_validate(typ, value, key)
except (IndexError, KeyError):
raise vol.Invalid(f"Type error for {key}") from None
_check_missing_options(raw_schema, options, 'root')
return options
return validate
# pylint: disable=no-value-for-parameter
# pylint: disable=inconsistent-return-statements
def _single_validate(typ, value, key):
"""Validate a single element."""
# if required argument
if value is None:
raise vol.Invalid(f"Missing required option '{key}'")
# parse extend data from type
match = RE_SCHEMA_ELEMENT.match(typ)
# prepare range
range_args = {}
for group_name in ('i_min', 'i_max', 'f_min', 'f_max'):
group_value = match.group(group_name)
if group_value:
range_args[group_name[2:]] = float(group_value)
if typ.startswith(V_STR):
return str(value)
elif typ.startswith(V_INT):
return vol.All(vol.Coerce(int), vol.Range(**range_args))(value)
elif typ.startswith(V_FLOAT):
return vol.All(vol.Coerce(float), vol.Range(**range_args))(value)
elif typ.startswith(V_BOOL):
return vol.Boolean()(value)
elif typ.startswith(V_EMAIL):
return vol.Email()(value)
elif typ.startswith(V_URL):
return vol.Url()(value)
elif typ.startswith(V_PORT):
return NETWORK_PORT(value)
elif typ.startswith(V_MATCH):
return vol.Match(match.group('match'))(str(value))
raise vol.Invalid(f"Fatal error for {key} type {typ}")
def _nested_validate_list(typ, data_list, key):
"""Validate nested items."""
options = []
for element in data_list:
# Nested?
if isinstance(typ, dict):
c_options = _nested_validate_dict(typ, element, key)
options.append(c_options)
else:
options.append(_single_validate(typ, element, key))
return options
def _nested_validate_dict(typ, data_dict, key):
"""Validate nested items."""
options = {}
for c_key, c_value in data_dict.items():
# Ignore unknown options / remove from list
if c_key not in typ:
_LOGGER.warning("Unknown options %s", c_key)
continue
# Nested?
if isinstance(typ[c_key], list):
options[c_key] = _nested_validate_list(typ[c_key][0],
c_value, c_key)
else:
options[c_key] = _single_validate(typ[c_key], c_value, c_key)
_check_missing_options(typ, options, key)
return options
def _check_missing_options(origin, exists, root):
"""Check if all options are exists."""
missing = set(origin) - set(exists)
for miss_opt in missing:
if isinstance(origin[miss_opt], str) and \
origin[miss_opt].endswith("?"):
continue
raise vol.Invalid(f"Missing option {miss_opt} in {root}")