Files
supervisor/hassio/addons/validate.py
Pascal Vizeli 8d468328f3 Expose new function to add-ons (#138)
* Expose new function to add-ons

* Rename `hassio` to `hassio_api`

* fix lint

* done
2017-08-08 16:54:42 +02:00

229 lines
7.3 KiB
Python

"""Validate addons options schema."""
import voluptuous as vol
from ..const import (
ATTR_NAME, ATTR_VERSION, ATTR_SLUG, ATTR_DESCRIPTON, ATTR_STARTUP,
ATTR_BOOT, ATTR_MAP, ATTR_OPTIONS, ATTR_PORTS, STARTUP_ONCE,
STARTUP_SYSTEM, STARTUP_SERVICES, STARTUP_APPLICATION, STARTUP_INITIALIZE,
BOOT_AUTO, BOOT_MANUAL, ATTR_SCHEMA, ATTR_IMAGE, ATTR_URL, ATTR_MAINTAINER,
ATTR_ARCH, ATTR_DEVICES, ATTR_ENVIRONMENT, ATTR_HOST_NETWORK, ARCH_ARMHF,
ARCH_AARCH64, ARCH_AMD64, ARCH_I386, ATTR_TMPFS, ATTR_PRIVILEGED,
ATTR_USER, ATTR_STATE, ATTR_SYSTEM, STATE_STARTED, STATE_STOPPED,
ATTR_LOCATON, ATTR_REPOSITORY, ATTR_TIMEOUT, ATTR_NETWORK,
ATTR_AUTO_UPDATE, ATTR_WEBUI, ATTR_AUDIO, ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT, ATTR_HASSIO_API)
from ..validate import NETWORK_PORT, DOCKER_PORTS, ALSA_CHANNEL
MAP_VOLUME = r"^(config|ssl|addons|backup|share)(?::(rw|:ro))?$"
V_STR = 'str'
V_INT = 'int'
V_FLOAT = 'float'
V_BOOL = 'bool'
V_EMAIL = 'email'
V_URL = 'url'
V_PORT = 'port'
ADDON_ELEMENT = vol.In([V_STR, V_INT, V_FLOAT, V_BOOL, V_EMAIL, V_URL, V_PORT])
ARCH_ALL = [
ARCH_ARMHF, ARCH_AARCH64, ARCH_AMD64, ARCH_I386
]
STARTUP_ALL = [
STARTUP_ONCE, STARTUP_INITIALIZE, STARTUP_SYSTEM, STARTUP_SERVICES,
STARTUP_APPLICATION
]
PRIVILEGED_ALL = [
"NET_ADMIN",
"SYS_ADMIN",
"SYS_RAWIO"
]
def _simple_startup(value):
"""Simple startup schema."""
if value == "before":
return STARTUP_SERVICES
if value == "after":
return STARTUP_APPLICATION
return value
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_CONFIG = vol.Schema({
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Required(ATTR_SLUG): vol.Coerce(str),
vol.Required(ATTR_DESCRIPTON): vol.Coerce(str),
vol.Optional(ATTR_URL): vol.Url(),
vol.Optional(ATTR_ARCH, default=ARCH_ALL): [vol.In(ARCH_ALL)],
vol.Required(ATTR_STARTUP):
vol.All(_simple_startup, vol.In(STARTUP_ALL)),
vol.Required(ATTR_BOOT):
vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_PORTS): DOCKER_PORTS,
vol.Optional(ATTR_WEBUI):
vol.Match(r"^(?:https?):\/\/\[HOST\]:\[PORT:\d+\].*$"),
vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")],
vol.Optional(ATTR_TMPFS):
vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
vol.Optional(ATTR_MAP, default=[]): [vol.Match(MAP_VOLUME)],
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)],
vol.Optional(ATTR_AUDIO, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),
vol.Required(ATTR_OPTIONS): dict,
vol.Required(ATTR_SCHEMA): vol.Any(vol.Schema({
vol.Coerce(str): vol.Any(ADDON_ELEMENT, [
vol.Any(ADDON_ELEMENT, {vol.Coerce(str): ADDON_ELEMENT})
], vol.Schema({vol.Coerce(str): ADDON_ELEMENT}))
}), False),
vol.Optional(ATTR_IMAGE): vol.Match(r"\w*/\w*"),
vol.Optional(ATTR_TIMEOUT, default=10):
vol.All(vol.Coerce(int), vol.Range(min=10, max=120))
}, extra=vol.ALLOW_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_REPOSITORY_CONFIG = vol.Schema({
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Optional(ATTR_URL): vol.Url(),
vol.Optional(ATTR_MAINTAINER): vol.Coerce(str),
}, extra=vol.ALLOW_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_USER = vol.Schema({
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Optional(ATTR_OPTIONS, default={}): dict,
vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(),
vol.Optional(ATTR_BOOT):
vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): DOCKER_PORTS,
vol.Optional(ATTR_AUDIO_OUTPUT): ALSA_CHANNEL,
vol.Optional(ATTR_AUDIO_INPUT): ALSA_CHANNEL,
})
SCHEMA_ADDON_SYSTEM = SCHEMA_ADDON_CONFIG.extend({
vol.Required(ATTR_LOCATON): vol.Coerce(str),
vol.Required(ATTR_REPOSITORY): vol.Coerce(str),
})
SCHEMA_ADDON_FILE = vol.Schema({
vol.Optional(ATTR_USER, default={}): {
vol.Coerce(str): SCHEMA_ADDON_USER,
},
vol.Optional(ATTR_SYSTEM, default={}): {
vol.Coerce(str): SCHEMA_ADDON_SYSTEM,
}
})
SCHEMA_ADDON_SNAPSHOT = vol.Schema({
vol.Required(ATTR_USER): SCHEMA_ADDON_USER,
vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM,
vol.Required(ATTR_STATE): vol.In([STATE_STARTED, STATE_STOPPED]),
vol.Required(ATTR_VERSION): vol.Coerce(str),
})
def validate_options(raw_schema):
"""Validate schema."""
def validate(struct):
"""Create schema validator for addons options."""
options = {}
# read options
for key, value in struct.items():
if key not in raw_schema:
raise vol.Invalid("Unknown options {}.".format(key))
typ = raw_schema[key]
try:
if isinstance(typ, list):
# nested value list
options[key] = _nested_validate_list(typ[0], value, key)
elif isinstance(typ, dict):
# nested value dict
options[key] = _nested_validate_dict(typ, value, key)
else:
# normal value
options[key] = _single_validate(typ, value, key)
except (IndexError, KeyError):
raise vol.Invalid(
"Type error for {}.".format(key)) from None
return options
return validate
# pylint: disable=no-value-for-parameter
def _single_validate(typ, value, key):
"""Validate a single element."""
try:
# if required argument
if value is None:
raise vol.Invalid("Missing required option '{}'.".format(key))
if typ == V_STR:
return str(value)
elif typ == V_INT:
return int(value)
elif typ == V_FLOAT:
return float(value)
elif typ == V_BOOL:
return vol.Boolean()(value)
elif typ == V_EMAIL:
return vol.Email()(value)
elif typ == V_URL:
return vol.Url()(value)
elif typ == V_PORT:
return NETWORK_PORT(value)
raise vol.Invalid("Fatal error for {} type {}".format(key, typ))
except ValueError:
raise vol.Invalid(
"Type {} error for '{}' on {}.".format(typ, value, key)) from None
def _nested_validate_list(typ, data_list, key):
"""Validate nested items."""
options = []
for element in data_list:
# dict list
if isinstance(typ, dict):
c_options = {}
for c_key, c_value in element.items():
if c_key not in typ:
raise vol.Invalid(
"Unknown nested options {}".format(c_key))
c_options[c_key] = _single_validate(typ[c_key], c_value, c_key)
options.append(c_options)
# normal list
else:
options.append(_single_validate(typ, element, key))
return options
def _nested_validate_dict(typ, data_dict, key):
"""Validate nested items."""
options = {}
for c_key, c_value in data_dict.items():
if c_key not in typ:
raise vol.Invalid("Unknow nested dict options {}".format(c_key))
options[c_key] = _single_validate(typ[c_key], c_value, c_key)
return options