mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-11-09 02:49:43 +00:00
Add secrets support for options (#1283)
* Add secrets API * Don't expose secrets
This commit is contained in:
@@ -109,16 +109,21 @@ V_EMAIL = "email"
|
||||
V_URL = "url"
|
||||
V_PORT = "port"
|
||||
V_MATCH = "match"
|
||||
V_LIST = "list"
|
||||
|
||||
RE_SCHEMA_ELEMENT = re.compile(
|
||||
r"^(?:"
|
||||
r"|str|bool|email|url|port"
|
||||
r"|bool|email|url|port"
|
||||
r"|str(?:\((?P<s_min>\d+)?,(?P<s_max>\d+)?\))?"
|
||||
r"|int(?:\((?P<i_min>\d+)?,(?P<i_max>\d+)?\))?"
|
||||
r"|float(?:\((?P<f_min>[\d\.]+)?,(?P<f_max>[\d\.]+)?\))?"
|
||||
r"|match\((?P<match>.*)\)"
|
||||
r"|list\((?P<list>.+)\)"
|
||||
r")\??$"
|
||||
)
|
||||
|
||||
_SCHEMA_LENGTH_PARTS = ("i_min", "i_max", "f_min", "f_max", "s_min", "s_max")
|
||||
|
||||
RE_DOCKER_IMAGE = re.compile(r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)$")
|
||||
RE_DOCKER_IMAGE_BUILD = re.compile(
|
||||
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$"
|
||||
@@ -305,7 +310,7 @@ SCHEMA_ADDON_SNAPSHOT = vol.Schema(
|
||||
)
|
||||
|
||||
|
||||
def validate_options(raw_schema):
|
||||
def validate_options(coresys, raw_schema):
|
||||
"""Validate schema."""
|
||||
|
||||
def validate(struct):
|
||||
@@ -323,13 +328,13 @@ def validate_options(raw_schema):
|
||||
try:
|
||||
if isinstance(typ, list):
|
||||
# nested value list
|
||||
options[key] = _nested_validate_list(typ[0], value, key)
|
||||
options[key] = _nested_validate_list(coresys, typ[0], value, key)
|
||||
elif isinstance(typ, dict):
|
||||
# nested value dict
|
||||
options[key] = _nested_validate_dict(typ, value, key)
|
||||
options[key] = _nested_validate_dict(coresys, typ, value, key)
|
||||
else:
|
||||
# normal value
|
||||
options[key] = _single_validate(typ, value, key)
|
||||
options[key] = _single_validate(coresys, typ, value, key)
|
||||
except (IndexError, KeyError):
|
||||
raise vol.Invalid(f"Type error for {key}") from None
|
||||
|
||||
@@ -341,24 +346,29 @@ def validate_options(raw_schema):
|
||||
|
||||
# pylint: disable=no-value-for-parameter
|
||||
# pylint: disable=inconsistent-return-statements
|
||||
def _single_validate(typ, value, key):
|
||||
def _single_validate(coresys, typ, value, key):
|
||||
"""Validate a single element."""
|
||||
# if required argument
|
||||
if value is None:
|
||||
raise vol.Invalid(f"Missing required option '{key}'")
|
||||
|
||||
# Lookup secret
|
||||
if str(value).startswith("!secret "):
|
||||
secret: str = value.partition(" ")[2]
|
||||
value = coresys.secrets.get(secret)
|
||||
|
||||
# parse extend data from type
|
||||
match = RE_SCHEMA_ELEMENT.match(typ)
|
||||
|
||||
# prepare range
|
||||
range_args = {}
|
||||
for group_name in ("i_min", "i_max", "f_min", "f_max"):
|
||||
for group_name in _SCHEMA_LENGTH_PARTS:
|
||||
group_value = match.group(group_name)
|
||||
if group_value:
|
||||
range_args[group_name[2:]] = float(group_value)
|
||||
|
||||
if typ.startswith(V_STR):
|
||||
return str(value)
|
||||
return vol.All(str(value), vol.Range(**range_args))(value)
|
||||
elif typ.startswith(V_INT):
|
||||
return vol.All(vol.Coerce(int), vol.Range(**range_args))(value)
|
||||
elif typ.startswith(V_FLOAT):
|
||||
@@ -373,26 +383,28 @@ def _single_validate(typ, value, key):
|
||||
return NETWORK_PORT(value)
|
||||
elif typ.startswith(V_MATCH):
|
||||
return vol.Match(match.group("match"))(str(value))
|
||||
elif typ.strartswith(V_LIST):
|
||||
return vol.In(match.group("list").split("|"))(str(value))
|
||||
|
||||
raise vol.Invalid(f"Fatal error for {key} type {typ}")
|
||||
|
||||
|
||||
def _nested_validate_list(typ, data_list, key):
|
||||
def _nested_validate_list(coresys, typ, data_list, key):
|
||||
"""Validate nested items."""
|
||||
options = []
|
||||
|
||||
for element in data_list:
|
||||
# Nested?
|
||||
if isinstance(typ, dict):
|
||||
c_options = _nested_validate_dict(typ, element, key)
|
||||
c_options = _nested_validate_dict(coresys, typ, element, key)
|
||||
options.append(c_options)
|
||||
else:
|
||||
options.append(_single_validate(typ, element, key))
|
||||
options.append(_single_validate(coresys, typ, element, key))
|
||||
|
||||
return options
|
||||
|
||||
|
||||
def _nested_validate_dict(typ, data_dict, key):
|
||||
def _nested_validate_dict(coresys, typ, data_dict, key):
|
||||
"""Validate nested items."""
|
||||
options = {}
|
||||
|
||||
@@ -404,9 +416,11 @@ def _nested_validate_dict(typ, data_dict, key):
|
||||
|
||||
# Nested?
|
||||
if isinstance(typ[c_key], list):
|
||||
options[c_key] = _nested_validate_list(typ[c_key][0], c_value, c_key)
|
||||
options[c_key] = _nested_validate_list(
|
||||
coresys, typ[c_key][0], c_value, c_key
|
||||
)
|
||||
else:
|
||||
options[c_key] = _single_validate(typ[c_key], c_value, c_key)
|
||||
options[c_key] = _single_validate(coresys, typ[c_key], c_value, c_key)
|
||||
|
||||
_check_missing_options(typ, options, key)
|
||||
return options
|
||||
|
||||
Reference in New Issue
Block a user