Add black support (#1101)

This commit is contained in:
Pascal Vizeli 2019-05-27 12:35:06 +02:00 committed by GitHub
parent 496cee1ec4
commit b4665f3907
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 964 additions and 791 deletions

View File

@ -39,6 +39,20 @@ jobs:
displayName: 'Run Tox'
- job: 'Black'
pool:
vmImage: 'ubuntu-16.04'
steps:
- task: UsePythonVersion@0
displayName: 'Use Python $(python.version)'
inputs:
versionSpec: '3.7'
- script: pip install black
displayName: 'Install black'
- script: black --check hassio
displayName: 'Run Black'
- job: 'JQ'
pool:
vmImage: 'ubuntu-16.04'
@ -88,7 +102,7 @@ jobs:
sudo apt-get install -y --no-install-recommends \
qemu-user-static \
binfmt-support
sudo mount binfmt_misc -t binfmt_misc /proc/sys/fs/binfmt_misc
sudo update-binfmts --enable qemu-arm
sudo update-binfmts --enable qemu-aarch64

View File

@ -110,16 +110,14 @@ class AddonManager(CoreSysAttributes):
raise AddonsError()
if not store.available:
_LOGGER.error(
"Add-on %s not supported on that platform", slug)
_LOGGER.error("Add-on %s not supported on that platform", slug)
raise AddonsNotSupportedError()
self.data.install(store)
addon = Addon(self.coresys, slug)
if not addon.path_data.is_dir():
_LOGGER.info(
"Create Home Assistant add-on data folder %s", addon.path_data)
_LOGGER.info("Create Home Assistant add-on data folder %s", addon.path_data)
addon.path_data.mkdir()
# Setup/Fix AppArmor profile
@ -179,8 +177,7 @@ class AddonManager(CoreSysAttributes):
# Check if available, Maybe something have changed
if not store.available:
_LOGGER.error(
"Add-on %s not supported on that platform", slug)
_LOGGER.error("Add-on %s not supported on that platform", slug)
raise AddonsNotSupportedError()
# Update instance

View File

@ -59,7 +59,8 @@ _LOGGER = logging.getLogger(__name__)
RE_WEBUI = re.compile(
r"^(?:(?P<s_prefix>https?)|\[PROTO:(?P<t_proto>\w+)\])"
r":\/\/\[HOST\]:\[PORT:(?P<t_port>\d+)\](?P<s_suffix>.*)$")
r":\/\/\[HOST\]:\[PORT:(?P<t_port>\d+)\](?P<s_suffix>.*)$"
)
class Addon(AddonModel):
@ -121,10 +122,7 @@ class Addon(AddonModel):
@property
def options(self) -> Dict[str, Any]:
"""Return options with local changes."""
return {
**self.data[ATTR_OPTIONS],
**self.persist[ATTR_OPTIONS]
}
return {**self.data[ATTR_OPTIONS], **self.persist[ATTR_OPTIONS]}
@options.setter
def options(self, value: Optional[Dict[str, Any]]):
@ -231,10 +229,10 @@ class Addon(AddonModel):
webui = RE_WEBUI.match(url)
# extract arguments
t_port = webui.group('t_port')
t_proto = webui.group('t_proto')
s_prefix = webui.group('s_prefix') or ""
s_suffix = webui.group('s_suffix') or ""
t_port = webui.group("t_port")
t_proto = webui.group("t_proto")
s_prefix = webui.group("s_prefix") or ""
s_suffix = webui.group("s_suffix") or ""
# search host port for this docker port
if self.ports is None:
@ -248,7 +246,7 @@ class Addon(AddonModel):
# lookup the correct protocol from config
if t_proto:
proto = 'https' if self.options[t_proto] else 'http'
proto = "https" if self.options[t_proto] else "http"
else:
proto = s_prefix
@ -353,8 +351,11 @@ class Addon(AddonModel):
schema(options)
write_json_file(self.path_options, options)
except vol.Invalid as ex:
_LOGGER.error("Add-on %s have wrong options: %s", self.slug,
humanize_error(options, ex))
_LOGGER.error(
"Add-on %s have wrong options: %s",
self.slug,
humanize_error(options, ex),
)
except JsonFileError:
_LOGGER.error("Add-on %s can't write options", self.slug)
else:
@ -381,10 +382,11 @@ class Addon(AddonModel):
def write_asound(self):
"""Write asound config to file and return True on success."""
asound_config = self.sys_host.alsa.asound(
alsa_input=self.audio_input, alsa_output=self.audio_output)
alsa_input=self.audio_input, alsa_output=self.audio_output
)
try:
with self.path_asound.open('w') as config_file:
with self.path_asound.open("w") as config_file:
config_file.write(asound_config)
except OSError as err:
_LOGGER.error("Add-on %s can't write asound: %s", self.slug, err)
@ -408,7 +410,7 @@ class Addon(AddonModel):
# Need install/update
with TemporaryDirectory(dir=self.sys_config.path_tmp) as tmp_folder:
profile_file = Path(tmp_folder, 'apparmor.txt')
profile_file = Path(tmp_folder, "apparmor.txt")
adjust_profile(self.slug, self.path_apparmor, profile_file)
await self.sys_host.apparmor.load_profile(self.slug, profile_file)
@ -430,14 +432,10 @@ class Addon(AddonModel):
return True
# merge options
options = {
**self.persist[ATTR_OPTIONS],
**default_options,
}
options = {**self.persist[ATTR_OPTIONS], **default_options}
# create voluptuous
new_schema = \
vol.Schema(vol.All(dict, validate_options(new_raw_schema)))
new_schema = vol.Schema(vol.All(dict, validate_options(new_raw_schema)))
# validate
try:
@ -525,7 +523,7 @@ class Addon(AddonModel):
# store local image
if self.need_build:
try:
await self.instance.export_image(Path(temp, 'image.tar'))
await self.instance.export_image(Path(temp, "image.tar"))
except DockerAPIError:
raise AddonsError() from None
@ -538,14 +536,14 @@ class Addon(AddonModel):
# Store local configs/state
try:
write_json_file(Path(temp, 'addon.json'), data)
write_json_file(Path(temp, "addon.json"), data)
except JsonFileError:
_LOGGER.error("Can't save meta for %s", self.slug)
raise AddonsError() from None
# Store AppArmor Profile
if self.sys_host.apparmor.exists(self.slug):
profile = Path(temp, 'apparmor.txt')
profile = Path(temp, "apparmor.txt")
try:
self.sys_host.apparmor.backup_profile(self.slug, profile)
except HostAppArmorError:
@ -585,7 +583,7 @@ class Addon(AddonModel):
# Read snapshot data
try:
data = read_json_file(Path(temp, 'addon.json'))
data = read_json_file(Path(temp, "addon.json"))
except JsonFileError:
raise AddonsError() from None
@ -593,8 +591,11 @@ class Addon(AddonModel):
try:
data = SCHEMA_ADDON_SNAPSHOT(data)
except vol.Invalid as err:
_LOGGER.error("Can't validate %s, snapshot data: %s",
self.slug, humanize_error(data, err))
_LOGGER.error(
"Can't validate %s, snapshot data: %s",
self.slug,
humanize_error(data, err),
)
raise AddonsError() from None
# If available
@ -605,14 +606,16 @@ class Addon(AddonModel):
# Restore local add-on informations
_LOGGER.info("Restore config for addon %s", self.slug)
restore_image = self._image(data[ATTR_SYSTEM])
self.sys_addons.data.restore(self.slug, data[ATTR_USER], data[ATTR_SYSTEM], restore_image)
self.sys_addons.data.restore(
self.slug, data[ATTR_USER], data[ATTR_SYSTEM], restore_image
)
# Check version / restore image
version = data[ATTR_VERSION]
if not await self.instance.exists():
_LOGGER.info("Restore/Install image for addon %s", self.slug)
image_file = Path(temp, 'image.tar')
image_file = Path(temp, "image.tar")
if image_file.is_file():
with suppress(DockerAPIError):
await self.instance.import_image(image_file, version)
@ -643,11 +646,10 @@ class Addon(AddonModel):
raise AddonsError() from None
# Restore AppArmor
profile_file = Path(temp, 'apparmor.txt')
profile_file = Path(temp, "apparmor.txt")
if profile_file.exists():
try:
await self.sys_host.apparmor.load_profile(
self.slug, profile_file)
await self.sys_host.apparmor.load_profile(self.slug, profile_file)
except HostAppArmorError:
_LOGGER.error("Can't restore AppArmor profile")
raise AddonsError() from None

View File

@ -21,7 +21,8 @@ class AddonBuild(JsonConfig, CoreSysAttributes):
self.addon = addon
super().__init__(
Path(self.addon.path_location, 'build.json'), SCHEMA_BUILD_CONFIG)
Path(self.addon.path_location, "build.json"), SCHEMA_BUILD_CONFIG
)
def save_data(self):
"""Ignore save function."""
@ -31,8 +32,8 @@ class AddonBuild(JsonConfig, CoreSysAttributes):
def base_image(self) -> str:
"""Base images for this add-on."""
return self._data[ATTR_BUILD_FROM].get(
self.sys_arch.default,
f"homeassistant/{self.sys_arch.default}-base:latest")
self.sys_arch.default, f"homeassistant/{self.sys_arch.default}-base:latest"
)
@property
def squash(self) -> bool:
@ -47,28 +48,28 @@ class AddonBuild(JsonConfig, CoreSysAttributes):
def get_docker_args(self, version):
"""Create a dict with Docker build arguments."""
args = {
'path': str(self.addon.path_location),
'tag': f"{self.addon.image}:{version}",
'pull': True,
'forcerm': True,
'squash': self.squash,
'labels': {
'io.hass.version': version,
'io.hass.arch': self.sys_arch.default,
'io.hass.type': META_ADDON,
'io.hass.name': self._fix_label('name'),
'io.hass.description': self._fix_label('description'),
"path": str(self.addon.path_location),
"tag": f"{self.addon.image}:{version}",
"pull": True,
"forcerm": True,
"squash": self.squash,
"labels": {
"io.hass.version": version,
"io.hass.arch": self.sys_arch.default,
"io.hass.type": META_ADDON,
"io.hass.name": self._fix_label("name"),
"io.hass.description": self._fix_label("description"),
},
'buildargs': {
'BUILD_FROM': self.base_image,
'BUILD_VERSION': version,
'BUILD_ARCH': self.sys_arch.default,
"buildargs": {
"BUILD_FROM": self.base_image,
"BUILD_VERSION": version,
"BUILD_ARCH": self.sys_arch.default,
**self.additional_args,
}
},
}
if self.addon.url:
args['labels']['io.hass.url'] = self.addon.url
args["labels"]["io.hass.url"] = self.addon.url
return args

View File

@ -59,10 +59,9 @@ class AddonsData(JsonConfig, CoreSysAttributes):
def update(self, addon: AddonStore) -> None:
"""Update version of add-on."""
self.system[addon.slug] = deepcopy(addon.data)
self.user[addon.slug].update({
ATTR_VERSION: addon.version,
ATTR_IMAGE: addon.image,
})
self.user[addon.slug].update(
{ATTR_VERSION: addon.version, ATTR_IMAGE: addon.image}
)
self.save_data()
def restore(self, slug: str, user: Config, system: Config, image: str) -> None:

View File

@ -142,14 +142,14 @@ class AddonModel(CoreSysAttributes):
@property
def long_description(self) -> Optional[str]:
"""Return README.md as long_description."""
readme = Path(self.path_location, 'README.md')
readme = Path(self.path_location, "README.md")
# If readme not exists
if not readme.exists():
return None
# Return data
with readme.open('r') as readme_file:
with readme.open("r") as readme_file:
return readme_file.read()
@property
@ -185,7 +185,7 @@ class AddonModel(CoreSysAttributes):
services = {}
for data in services_list:
service = RE_SERVICE.match(data)
services[service.group('service')] = service.group('rights')
services[service.group("service")] = service.group("rights")
return services
@ -409,7 +409,7 @@ class AddonModel(CoreSysAttributes):
volumes = {}
for volume in self.data[ATTR_MAP]:
result = RE_VOLUME.match(volume)
volumes[result.group(1)] = result.group(2) or 'ro'
volumes[result.group(1)] = result.group(2) or "ro"
return volumes
@ -421,22 +421,22 @@ class AddonModel(CoreSysAttributes):
@property
def path_icon(self) -> Path:
"""Return path to add-on icon."""
return Path(self.path_location, 'icon.png')
return Path(self.path_location, "icon.png")
@property
def path_logo(self) -> Path:
"""Return path to add-on logo."""
return Path(self.path_location, 'logo.png')
return Path(self.path_location, "logo.png")
@property
def path_changelog(self) -> Path:
"""Return path to add-on changelog."""
return Path(self.path_location, 'CHANGELOG.md')
return Path(self.path_location, "CHANGELOG.md")
@property
def path_apparmor(self) -> Path:
"""Return path to custom AppArmor profile."""
return Path(self.path_location, 'apparmor.txt')
return Path(self.path_location, "apparmor.txt")
@property
def schema(self) -> vol.Schema:

View File

@ -100,14 +100,14 @@ _LOGGER = logging.getLogger(__name__)
RE_VOLUME = re.compile(r"^(config|ssl|addons|backup|share)(?::(rw|ro))?$")
RE_SERVICE = re.compile(r"^(?P<service>mqtt):(?P<rights>provide|want|need)$")
V_STR = 'str'
V_INT = 'int'
V_FLOAT = 'float'
V_BOOL = 'bool'
V_EMAIL = 'email'
V_URL = 'url'
V_PORT = 'port'
V_MATCH = 'match'
V_STR = "str"
V_INT = "int"
V_FLOAT = "float"
V_BOOL = "bool"
V_EMAIL = "email"
V_URL = "url"
V_PORT = "port"
V_MATCH = "match"
RE_SCHEMA_ELEMENT = re.compile(
r"^(?:"
@ -118,18 +118,28 @@ RE_SCHEMA_ELEMENT = re.compile(
r")\??$"
)
RE_DOCKER_IMAGE = re.compile(
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)$")
RE_DOCKER_IMAGE = re.compile(r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)$")
RE_DOCKER_IMAGE_BUILD = re.compile(
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$")
r"^([a-zA-Z\-\.:\d{}]+/)*?([\-\w{}]+)/([\-\w{}]+)(:[\.\-\w{}]+)?$"
)
SCHEMA_ELEMENT = vol.Match(RE_SCHEMA_ELEMENT)
MACHINE_ALL = [
'intel-nuc', 'odroid-c2', 'odroid-xu', 'orangepi-prime', 'qemux86',
'qemux86-64', 'qemuarm', 'qemuarm-64', 'raspberrypi', 'raspberrypi2',
'raspberrypi3', 'raspberrypi3-64', 'tinker',
"intel-nuc",
"odroid-c2",
"odroid-xu",
"orangepi-prime",
"qemux86",
"qemux86-64",
"qemuarm",
"qemuarm-64",
"raspberrypi",
"raspberrypi2",
"raspberrypi3",
"raspberrypi3-64",
"tinker",
]
@ -143,130 +153,157 @@ def _simple_startup(value):
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_CONFIG = vol.Schema({
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Required(ATTR_SLUG): vol.Coerce(str),
vol.Required(ATTR_DESCRIPTON): vol.Coerce(str),
vol.Required(ATTR_ARCH): [vol.In(ARCH_ALL)],
vol.Optional(ATTR_MACHINE): [vol.In(MACHINE_ALL)],
vol.Optional(ATTR_URL): vol.Url(),
vol.Required(ATTR_STARTUP):
vol.All(_simple_startup, vol.In(STARTUP_ALL)),
vol.Required(ATTR_BOOT):
vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_PORTS): DOCKER_PORTS,
vol.Optional(ATTR_PORTS_DESCRIPTION): DOCKER_PORTS_DESCRIPTION,
vol.Optional(ATTR_WEBUI):
vol.Match(r"^(?:https?|\[PROTO:\w+\]):\/\/\[HOST\]:\[PORT:\d+\].*$"),
vol.Optional(ATTR_INGRESS, default=False): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PORT, default=8099): vol.Any(NETWORK_PORT, vol.Equal(0)),
vol.Optional(ATTR_INGRESS_ENTRY): vol.Coerce(str),
vol.Optional(ATTR_PANEL_ICON, default="mdi:puzzle"): vol.Coerce(str),
vol.Optional(ATTR_PANEL_TITLE): vol.Coerce(str),
vol.Optional(ATTR_PANEL_ADMIN, default=True): vol.Boolean(),
vol.Optional(ATTR_HOMEASSISTANT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_PID, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_IPC, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")],
vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(),
vol.Optional(ATTR_TMPFS):
vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)],
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)],
vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(),
vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(),
vol.Optional(ATTR_AUDIO, default=False): vol.Boolean(),
vol.Optional(ATTR_GPIO, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(),
vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_ROLE, default=ROLE_DEFAULT): vol.In(ROLE_ALL),
vol.Optional(ATTR_HOMEASSISTANT_API, default=False): vol.Boolean(),
vol.Optional(ATTR_STDIN, default=False): vol.Boolean(),
vol.Optional(ATTR_LEGACY, default=False): vol.Boolean(),
vol.Optional(ATTR_DOCKER_API, default=False): vol.Boolean(),
vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(),
vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)],
vol.Optional(ATTR_DISCOVERY): [valid_discovery_service],
vol.Required(ATTR_OPTIONS): dict,
vol.Required(ATTR_SCHEMA): vol.Any(vol.Schema({
vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [
vol.Any(
SCHEMA_ELEMENT,
{vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}
SCHEMA_ADDON_CONFIG = vol.Schema(
{
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Required(ATTR_SLUG): vol.Coerce(str),
vol.Required(ATTR_DESCRIPTON): vol.Coerce(str),
vol.Required(ATTR_ARCH): [vol.In(ARCH_ALL)],
vol.Optional(ATTR_MACHINE): [vol.In(MACHINE_ALL)],
vol.Optional(ATTR_URL): vol.Url(),
vol.Required(ATTR_STARTUP): vol.All(_simple_startup, vol.In(STARTUP_ALL)),
vol.Required(ATTR_BOOT): vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_PORTS): DOCKER_PORTS,
vol.Optional(ATTR_PORTS_DESCRIPTION): DOCKER_PORTS_DESCRIPTION,
vol.Optional(ATTR_WEBUI): vol.Match(
r"^(?:https?|\[PROTO:\w+\]):\/\/\[HOST\]:\[PORT:\d+\].*$"
),
vol.Optional(ATTR_INGRESS, default=False): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PORT, default=8099): vol.Any(
NETWORK_PORT, vol.Equal(0)
),
vol.Optional(ATTR_INGRESS_ENTRY): vol.Coerce(str),
vol.Optional(ATTR_PANEL_ICON, default="mdi:puzzle"): vol.Coerce(str),
vol.Optional(ATTR_PANEL_TITLE): vol.Coerce(str),
vol.Optional(ATTR_PANEL_ADMIN, default=True): vol.Boolean(),
vol.Optional(ATTR_HOMEASSISTANT): vol.Maybe(vol.Coerce(str)),
vol.Optional(ATTR_HOST_NETWORK, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_PID, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_IPC, default=False): vol.Boolean(),
vol.Optional(ATTR_HOST_DBUS, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICES): [vol.Match(r"^(.*):(.*):([rwm]{1,3})$")],
vol.Optional(ATTR_AUTO_UART, default=False): vol.Boolean(),
vol.Optional(ATTR_TMPFS): vol.Match(r"^size=(\d)*[kmg](,uid=\d{1,4})?(,rw)?$"),
vol.Optional(ATTR_MAP, default=list): [vol.Match(RE_VOLUME)],
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)],
vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(),
vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(),
vol.Optional(ATTR_AUDIO, default=False): vol.Boolean(),
vol.Optional(ATTR_GPIO, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(),
vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_ROLE, default=ROLE_DEFAULT): vol.In(ROLE_ALL),
vol.Optional(ATTR_HOMEASSISTANT_API, default=False): vol.Boolean(),
vol.Optional(ATTR_STDIN, default=False): vol.Boolean(),
vol.Optional(ATTR_LEGACY, default=False): vol.Boolean(),
vol.Optional(ATTR_DOCKER_API, default=False): vol.Boolean(),
vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(),
vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)],
vol.Optional(ATTR_DISCOVERY): [valid_discovery_service],
vol.Required(ATTR_OPTIONS): dict,
vol.Required(ATTR_SCHEMA): vol.Any(
vol.Schema(
{
vol.Coerce(str): vol.Any(
SCHEMA_ELEMENT,
[
vol.Any(
SCHEMA_ELEMENT,
{
vol.Coerce(str): vol.Any(
SCHEMA_ELEMENT, [SCHEMA_ELEMENT]
)
},
)
],
vol.Schema(
{vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])}
),
)
}
),
], vol.Schema({
vol.Coerce(str): vol.Any(SCHEMA_ELEMENT, [SCHEMA_ELEMENT])
}))
}), False),
vol.Optional(ATTR_IMAGE):
vol.Match(RE_DOCKER_IMAGE),
vol.Optional(ATTR_TIMEOUT, default=10):
vol.All(vol.Coerce(int), vol.Range(min=10, max=120)),
}, extra=vol.REMOVE_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_BUILD_CONFIG = vol.Schema({
vol.Optional(ATTR_BUILD_FROM, default=dict): vol.Schema({
vol.In(ARCH_ALL): vol.Match(RE_DOCKER_IMAGE_BUILD),
}),
vol.Optional(ATTR_SQUASH, default=False): vol.Boolean(),
vol.Optional(ATTR_ARGS, default=dict): vol.Schema({
vol.Coerce(str): vol.Coerce(str)
}),
}, extra=vol.REMOVE_EXTRA)
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_USER = vol.Schema({
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Optional(ATTR_IMAGE): vol.Coerce(str),
vol.Optional(ATTR_UUID, default=lambda: uuid.uuid4().hex): UUID_MATCH,
vol.Optional(ATTR_ACCESS_TOKEN): TOKEN,
vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): vol.Coerce(str),
vol.Optional(ATTR_OPTIONS, default=dict): dict,
vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(),
vol.Optional(ATTR_BOOT):
vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): DOCKER_PORTS,
vol.Optional(ATTR_AUDIO_OUTPUT): ALSA_DEVICE,
vol.Optional(ATTR_AUDIO_INPUT): ALSA_DEVICE,
vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(),
}, extra=vol.REMOVE_EXTRA)
SCHEMA_ADDON_SYSTEM = SCHEMA_ADDON_CONFIG.extend({
vol.Required(ATTR_LOCATON): vol.Coerce(str),
vol.Required(ATTR_REPOSITORY): vol.Coerce(str),
})
SCHEMA_ADDONS_FILE = vol.Schema({
vol.Optional(ATTR_USER, default=dict): {
vol.Coerce(str): SCHEMA_ADDON_USER,
False,
),
vol.Optional(ATTR_IMAGE): vol.Match(RE_DOCKER_IMAGE),
vol.Optional(ATTR_TIMEOUT, default=10): vol.All(
vol.Coerce(int), vol.Range(min=10, max=120)
),
},
vol.Optional(ATTR_SYSTEM, default=dict): {
vol.Coerce(str): SCHEMA_ADDON_SYSTEM,
extra=vol.REMOVE_EXTRA,
)
# pylint: disable=no-value-for-parameter
SCHEMA_BUILD_CONFIG = vol.Schema(
{
vol.Optional(ATTR_BUILD_FROM, default=dict): vol.Schema(
{vol.In(ARCH_ALL): vol.Match(RE_DOCKER_IMAGE_BUILD)}
),
vol.Optional(ATTR_SQUASH, default=False): vol.Boolean(),
vol.Optional(ATTR_ARGS, default=dict): vol.Schema(
{vol.Coerce(str): vol.Coerce(str)}
),
},
extra=vol.REMOVE_EXTRA,
)
# pylint: disable=no-value-for-parameter
SCHEMA_ADDON_USER = vol.Schema(
{
vol.Required(ATTR_VERSION): vol.Coerce(str),
vol.Optional(ATTR_IMAGE): vol.Coerce(str),
vol.Optional(ATTR_UUID, default=lambda: uuid.uuid4().hex): UUID_MATCH,
vol.Optional(ATTR_ACCESS_TOKEN): TOKEN,
vol.Optional(ATTR_INGRESS_TOKEN, default=secrets.token_urlsafe): vol.Coerce(
str
),
vol.Optional(ATTR_OPTIONS, default=dict): dict,
vol.Optional(ATTR_AUTO_UPDATE, default=False): vol.Boolean(),
vol.Optional(ATTR_BOOT): vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): DOCKER_PORTS,
vol.Optional(ATTR_AUDIO_OUTPUT): ALSA_DEVICE,
vol.Optional(ATTR_AUDIO_INPUT): ALSA_DEVICE,
vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(),
},
extra=vol.REMOVE_EXTRA,
)
SCHEMA_ADDON_SYSTEM = SCHEMA_ADDON_CONFIG.extend(
{
vol.Required(ATTR_LOCATON): vol.Coerce(str),
vol.Required(ATTR_REPOSITORY): vol.Coerce(str),
}
})
)
SCHEMA_ADDON_SNAPSHOT = vol.Schema({
vol.Required(ATTR_USER): SCHEMA_ADDON_USER,
vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM,
vol.Required(ATTR_STATE): vol.In([STATE_STARTED, STATE_STOPPED]),
vol.Required(ATTR_VERSION): vol.Coerce(str),
}, extra=vol.REMOVE_EXTRA)
SCHEMA_ADDONS_FILE = vol.Schema(
{
vol.Optional(ATTR_USER, default=dict): {vol.Coerce(str): SCHEMA_ADDON_USER},
vol.Optional(ATTR_SYSTEM, default=dict): {vol.Coerce(str): SCHEMA_ADDON_SYSTEM},
}
)
SCHEMA_ADDON_SNAPSHOT = vol.Schema(
{
vol.Required(ATTR_USER): SCHEMA_ADDON_USER,
vol.Required(ATTR_SYSTEM): SCHEMA_ADDON_SYSTEM,
vol.Required(ATTR_STATE): vol.In([STATE_STARTED, STATE_STOPPED]),
vol.Required(ATTR_VERSION): vol.Coerce(str),
},
extra=vol.REMOVE_EXTRA,
)
def validate_options(raw_schema):
"""Validate schema."""
def validate(struct):
"""Create schema validator for add-ons options."""
options = {}
@ -292,7 +329,7 @@ def validate_options(raw_schema):
except (IndexError, KeyError):
raise vol.Invalid(f"Type error for {key}") from None
_check_missing_options(raw_schema, options, 'root')
_check_missing_options(raw_schema, options, "root")
return options
return validate
@ -311,7 +348,7 @@ def _single_validate(typ, value, key):
# prepare range
range_args = {}
for group_name in ('i_min', 'i_max', 'f_min', 'f_max'):
for group_name in ("i_min", "i_max", "f_min", "f_max"):
group_value = match.group(group_name)
if group_value:
range_args[group_name[2:]] = float(group_value)
@ -331,7 +368,7 @@ def _single_validate(typ, value, key):
elif typ.startswith(V_PORT):
return NETWORK_PORT(value)
elif typ.startswith(V_MATCH):
return vol.Match(match.group('match'))(str(value))
return vol.Match(match.group("match"))(str(value))
raise vol.Invalid(f"Fatal error for {key} type {typ}")
@ -363,8 +400,7 @@ def _nested_validate_dict(typ, data_dict, key):
# Nested?
if isinstance(typ[c_key], list):
options[c_key] = _nested_validate_list(typ[c_key][0],
c_value, c_key)
options[c_key] = _nested_validate_list(typ[c_key][0], c_value, c_key)
else:
options[c_key] = _single_validate(typ[c_key], c_value, c_key)
@ -376,7 +412,6 @@ def _check_missing_options(origin, exists, root):
"""Check if all options are exists."""
missing = set(origin) - set(exists)
for miss_opt in missing:
if isinstance(origin[miss_opt], str) and \
origin[miss_opt].endswith("?"):
if isinstance(origin[miss_opt], str) and origin[miss_opt].endswith("?"):
continue
raise vol.Invalid(f"Missing option {miss_opt} in {root}")

View File

@ -32,7 +32,8 @@ class RestAPI(CoreSysAttributes):
self.coresys: CoreSys = coresys
self.security: SecurityMiddleware = SecurityMiddleware(coresys)
self.webapp: web.Application = web.Application(
middlewares=[self.security.token_validation])
middlewares=[self.security.token_validation]
)
# service stuff
self._runner: web.AppRunner = web.AppRunner(self.webapp)
@ -60,205 +61,224 @@ class RestAPI(CoreSysAttributes):
api_host = APIHost()
api_host.coresys = self.coresys
self.webapp.add_routes([
web.get('/host/info', api_host.info),
web.post('/host/reboot', api_host.reboot),
web.post('/host/shutdown', api_host.shutdown),
web.post('/host/reload', api_host.reload),
web.post('/host/options', api_host.options),
web.get('/host/services', api_host.services),
web.post('/host/services/{service}/stop', api_host.service_stop),
web.post('/host/services/{service}/start', api_host.service_start),
web.post('/host/services/{service}/restart',
api_host.service_restart),
web.post('/host/services/{service}/reload',
api_host.service_reload),
])
self.webapp.add_routes(
[
web.get("/host/info", api_host.info),
web.post("/host/reboot", api_host.reboot),
web.post("/host/shutdown", api_host.shutdown),
web.post("/host/reload", api_host.reload),
web.post("/host/options", api_host.options),
web.get("/host/services", api_host.services),
web.post("/host/services/{service}/stop", api_host.service_stop),
web.post("/host/services/{service}/start", api_host.service_start),
web.post("/host/services/{service}/restart", api_host.service_restart),
web.post("/host/services/{service}/reload", api_host.service_reload),
]
)
def _register_hassos(self) -> None:
"""Register HassOS functions."""
api_hassos = APIHassOS()
api_hassos.coresys = self.coresys
self.webapp.add_routes([
web.get('/hassos/info', api_hassos.info),
web.post('/hassos/update', api_hassos.update),
web.post('/hassos/update/cli', api_hassos.update_cli),
web.post('/hassos/config/sync', api_hassos.config_sync),
])
self.webapp.add_routes(
[
web.get("/hassos/info", api_hassos.info),
web.post("/hassos/update", api_hassos.update),
web.post("/hassos/update/cli", api_hassos.update_cli),
web.post("/hassos/config/sync", api_hassos.config_sync),
]
)
def _register_hardware(self) -> None:
"""Register hardware functions."""
api_hardware = APIHardware()
api_hardware.coresys = self.coresys
self.webapp.add_routes([
web.get('/hardware/info', api_hardware.info),
web.get('/hardware/audio', api_hardware.audio),
])
self.webapp.add_routes(
[
web.get("/hardware/info", api_hardware.info),
web.get("/hardware/audio", api_hardware.audio),
]
)
def _register_info(self) -> None:
"""Register info functions."""
api_info = APIInfo()
api_info.coresys = self.coresys
self.webapp.add_routes([
web.get('/info', api_info.info),
])
self.webapp.add_routes([web.get("/info", api_info.info)])
def _register_auth(self) -> None:
"""Register auth functions."""
api_auth = APIAuth()
api_auth.coresys = self.coresys
self.webapp.add_routes([
web.post('/auth', api_auth.auth),
])
self.webapp.add_routes([web.post("/auth", api_auth.auth)])
def _register_supervisor(self) -> None:
"""Register Supervisor functions."""
api_supervisor = APISupervisor()
api_supervisor.coresys = self.coresys
self.webapp.add_routes([
web.get('/supervisor/ping', api_supervisor.ping),
web.get('/supervisor/info', api_supervisor.info),
web.get('/supervisor/stats', api_supervisor.stats),
web.get('/supervisor/logs', api_supervisor.logs),
web.post('/supervisor/update', api_supervisor.update),
web.post('/supervisor/reload', api_supervisor.reload),
web.post('/supervisor/options', api_supervisor.options),
])
self.webapp.add_routes(
[
web.get("/supervisor/ping", api_supervisor.ping),
web.get("/supervisor/info", api_supervisor.info),
web.get("/supervisor/stats", api_supervisor.stats),
web.get("/supervisor/logs", api_supervisor.logs),
web.post("/supervisor/update", api_supervisor.update),
web.post("/supervisor/reload", api_supervisor.reload),
web.post("/supervisor/options", api_supervisor.options),
]
)
def _register_homeassistant(self) -> None:
"""Register Home Assistant functions."""
api_hass = APIHomeAssistant()
api_hass.coresys = self.coresys
self.webapp.add_routes([
web.get('/homeassistant/info', api_hass.info),
web.get('/homeassistant/logs', api_hass.logs),
web.get('/homeassistant/stats', api_hass.stats),
web.post('/homeassistant/options', api_hass.options),
web.post('/homeassistant/update', api_hass.update),
web.post('/homeassistant/restart', api_hass.restart),
web.post('/homeassistant/stop', api_hass.stop),
web.post('/homeassistant/start', api_hass.start),
web.post('/homeassistant/check', api_hass.check),
web.post('/homeassistant/rebuild', api_hass.rebuild),
])
self.webapp.add_routes(
[
web.get("/homeassistant/info", api_hass.info),
web.get("/homeassistant/logs", api_hass.logs),
web.get("/homeassistant/stats", api_hass.stats),
web.post("/homeassistant/options", api_hass.options),
web.post("/homeassistant/update", api_hass.update),
web.post("/homeassistant/restart", api_hass.restart),
web.post("/homeassistant/stop", api_hass.stop),
web.post("/homeassistant/start", api_hass.start),
web.post("/homeassistant/check", api_hass.check),
web.post("/homeassistant/rebuild", api_hass.rebuild),
]
)
def _register_proxy(self) -> None:
"""Register Home Assistant API Proxy."""
api_proxy = APIProxy()
api_proxy.coresys = self.coresys
self.webapp.add_routes([
web.get('/homeassistant/api/websocket', api_proxy.websocket),
web.get('/homeassistant/websocket', api_proxy.websocket),
web.get('/homeassistant/api/stream', api_proxy.stream),
web.post('/homeassistant/api/{path:.+}', api_proxy.api),
web.get('/homeassistant/api/{path:.+}', api_proxy.api),
web.get('/homeassistant/api/', api_proxy.api),
])
self.webapp.add_routes(
[
web.get("/homeassistant/api/websocket", api_proxy.websocket),
web.get("/homeassistant/websocket", api_proxy.websocket),
web.get("/homeassistant/api/stream", api_proxy.stream),
web.post("/homeassistant/api/{path:.+}", api_proxy.api),
web.get("/homeassistant/api/{path:.+}", api_proxy.api),
web.get("/homeassistant/api/", api_proxy.api),
]
)
def _register_addons(self) -> None:
"""Register Add-on functions."""
api_addons = APIAddons()
api_addons.coresys = self.coresys
self.webapp.add_routes([
web.get('/addons', api_addons.list),
web.post('/addons/reload', api_addons.reload),
web.get('/addons/{addon}/info', api_addons.info),
web.post('/addons/{addon}/install', api_addons.install),
web.post('/addons/{addon}/uninstall', api_addons.uninstall),
web.post('/addons/{addon}/start', api_addons.start),
web.post('/addons/{addon}/stop', api_addons.stop),
web.post('/addons/{addon}/restart', api_addons.restart),
web.post('/addons/{addon}/update', api_addons.update),
web.post('/addons/{addon}/options', api_addons.options),
web.post('/addons/{addon}/rebuild', api_addons.rebuild),
web.get('/addons/{addon}/logs', api_addons.logs),
web.get('/addons/{addon}/icon', api_addons.icon),
web.get('/addons/{addon}/logo', api_addons.logo),
web.get('/addons/{addon}/changelog', api_addons.changelog),
web.post('/addons/{addon}/stdin', api_addons.stdin),
web.post('/addons/{addon}/security', api_addons.security),
web.get('/addons/{addon}/stats', api_addons.stats),
])
self.webapp.add_routes(
[
web.get("/addons", api_addons.list),
web.post("/addons/reload", api_addons.reload),
web.get("/addons/{addon}/info", api_addons.info),
web.post("/addons/{addon}/install", api_addons.install),
web.post("/addons/{addon}/uninstall", api_addons.uninstall),
web.post("/addons/{addon}/start", api_addons.start),
web.post("/addons/{addon}/stop", api_addons.stop),
web.post("/addons/{addon}/restart", api_addons.restart),
web.post("/addons/{addon}/update", api_addons.update),
web.post("/addons/{addon}/options", api_addons.options),
web.post("/addons/{addon}/rebuild", api_addons.rebuild),
web.get("/addons/{addon}/logs", api_addons.logs),
web.get("/addons/{addon}/icon", api_addons.icon),
web.get("/addons/{addon}/logo", api_addons.logo),
web.get("/addons/{addon}/changelog", api_addons.changelog),
web.post("/addons/{addon}/stdin", api_addons.stdin),
web.post("/addons/{addon}/security", api_addons.security),
web.get("/addons/{addon}/stats", api_addons.stats),
]
)
def _register_ingress(self) -> None:
"""Register Ingress functions."""
api_ingress = APIIngress()
api_ingress.coresys = self.coresys
self.webapp.add_routes([
web.post('/ingress/session', api_ingress.create_session),
web.get('/ingress/panels', api_ingress.panels),
web.view('/ingress/{token}/{path:.*}', api_ingress.handler),
])
self.webapp.add_routes(
[
web.post("/ingress/session", api_ingress.create_session),
web.get("/ingress/panels", api_ingress.panels),
web.view("/ingress/{token}/{path:.*}", api_ingress.handler),
]
)
def _register_snapshots(self) -> None:
"""Register snapshots functions."""
api_snapshots = APISnapshots()
api_snapshots.coresys = self.coresys
self.webapp.add_routes([
web.get('/snapshots', api_snapshots.list),
web.post('/snapshots/reload', api_snapshots.reload),
web.post('/snapshots/new/full', api_snapshots.snapshot_full),
web.post('/snapshots/new/partial', api_snapshots.snapshot_partial),
web.post('/snapshots/new/upload', api_snapshots.upload),
web.get('/snapshots/{snapshot}/info', api_snapshots.info),
web.post('/snapshots/{snapshot}/remove', api_snapshots.remove),
web.post('/snapshots/{snapshot}/restore/full',
api_snapshots.restore_full),
web.post('/snapshots/{snapshot}/restore/partial',
api_snapshots.restore_partial),
web.get('/snapshots/{snapshot}/download', api_snapshots.download),
])
self.webapp.add_routes(
[
web.get("/snapshots", api_snapshots.list),
web.post("/snapshots/reload", api_snapshots.reload),
web.post("/snapshots/new/full", api_snapshots.snapshot_full),
web.post("/snapshots/new/partial", api_snapshots.snapshot_partial),
web.post("/snapshots/new/upload", api_snapshots.upload),
web.get("/snapshots/{snapshot}/info", api_snapshots.info),
web.post("/snapshots/{snapshot}/remove", api_snapshots.remove),
web.post(
"/snapshots/{snapshot}/restore/full", api_snapshots.restore_full
),
web.post(
"/snapshots/{snapshot}/restore/partial",
api_snapshots.restore_partial,
),
web.get("/snapshots/{snapshot}/download", api_snapshots.download),
]
)
def _register_services(self) -> None:
"""Register services functions."""
api_services = APIServices()
api_services.coresys = self.coresys
self.webapp.add_routes([
web.get('/services', api_services.list),
web.get('/services/{service}', api_services.get_service),
web.post('/services/{service}', api_services.set_service),
web.delete('/services/{service}', api_services.del_service),
])
self.webapp.add_routes(
[
web.get("/services", api_services.list),
web.get("/services/{service}", api_services.get_service),
web.post("/services/{service}", api_services.set_service),
web.delete("/services/{service}", api_services.del_service),
]
)
def _register_discovery(self) -> None:
"""Register discovery functions."""
api_discovery = APIDiscovery()
api_discovery.coresys = self.coresys
self.webapp.add_routes([
web.get('/discovery', api_discovery.list),
web.get('/discovery/{uuid}', api_discovery.get_discovery),
web.delete('/discovery/{uuid}', api_discovery.del_discovery),
web.post('/discovery', api_discovery.set_discovery),
])
self.webapp.add_routes(
[
web.get("/discovery", api_discovery.list),
web.get("/discovery/{uuid}", api_discovery.get_discovery),
web.delete("/discovery/{uuid}", api_discovery.del_discovery),
web.post("/discovery", api_discovery.set_discovery),
]
)
def _register_panel(self) -> None:
"""Register panel for Home Assistant."""
panel_dir = Path(__file__).parent.joinpath("panel")
self.webapp.add_routes([web.static('/app', panel_dir)])
self.webapp.add_routes([web.static("/app", panel_dir)])
async def start(self) -> None:
"""Run RESTful API webserver."""
await self._runner.setup()
self._site = web.TCPSite(
self._runner, host="0.0.0.0", port=80, shutdown_timeout=5)
self._runner, host="0.0.0.0", port=80, shutdown_timeout=5
)
try:
await self._site.start()
except OSError as err:
_LOGGER.fatal("Failed to create HTTP server at 0.0.0.0:80 -> %s",
err)
_LOGGER.fatal("Failed to create HTTP server at 0.0.0.0:80 -> %s", err)
else:
_LOGGER.info("Start API on %s", self.sys_docker.network.supervisor)

View File

@ -91,35 +91,35 @@ from .utils import api_process, api_process_raw, api_validate
_LOGGER = logging.getLogger(__name__)
SCHEMA_VERSION = vol.Schema({
vol.Optional(ATTR_VERSION): vol.Coerce(str),
})
SCHEMA_VERSION = vol.Schema({vol.Optional(ATTR_VERSION): vol.Coerce(str)})
# pylint: disable=no-value-for-parameter
SCHEMA_OPTIONS = vol.Schema({
vol.Optional(ATTR_BOOT): vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): vol.Any(None, DOCKER_PORTS),
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
vol.Optional(ATTR_AUDIO_OUTPUT): ALSA_DEVICE,
vol.Optional(ATTR_AUDIO_INPUT): ALSA_DEVICE,
vol.Optional(ATTR_INGRESS_PANEL): vol.Boolean(),
})
SCHEMA_OPTIONS = vol.Schema(
{
vol.Optional(ATTR_BOOT): vol.In([BOOT_AUTO, BOOT_MANUAL]),
vol.Optional(ATTR_NETWORK): vol.Any(None, DOCKER_PORTS),
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
vol.Optional(ATTR_AUDIO_OUTPUT): ALSA_DEVICE,
vol.Optional(ATTR_AUDIO_INPUT): ALSA_DEVICE,
vol.Optional(ATTR_INGRESS_PANEL): vol.Boolean(),
}
)
# pylint: disable=no-value-for-parameter
SCHEMA_SECURITY = vol.Schema({
vol.Optional(ATTR_PROTECTED): vol.Boolean(),
})
SCHEMA_SECURITY = vol.Schema({vol.Optional(ATTR_PROTECTED): vol.Boolean()})
class APIAddons(CoreSysAttributes):
"""Handle RESTful API for add-on functions."""
def _extract_addon(self, request: web.Request, check_installed: bool = True) -> AnyAddon:
def _extract_addon(
self, request: web.Request, check_installed: bool = True
) -> AnyAddon:
"""Return addon, throw an exception it it doesn't exist."""
addon_slug = request.match_info.get('addon')
addon_slug = request.match_info.get("addon")
# Lookup itself
if addon_slug == 'self':
if addon_slug == "self":
return request.get(REQUEST_FROM)
addon = self.sys_addons.get(addon_slug)
@ -136,35 +136,36 @@ class APIAddons(CoreSysAttributes):
"""Return all add-ons or repositories."""
data_addons = []
for addon in self.sys_addons.all:
data_addons.append({
ATTR_NAME: addon.name,
ATTR_SLUG: addon.slug,
ATTR_DESCRIPTON: addon.description,
ATTR_VERSION: addon.latest_version,
ATTR_INSTALLED: addon.version if addon.is_installed else None,
ATTR_AVAILABLE: addon.available,
ATTR_DETACHED: addon.is_detached,
ATTR_REPOSITORY: addon.repository,
ATTR_BUILD: addon.need_build,
ATTR_URL: addon.url,
ATTR_ICON: addon.with_icon,
ATTR_LOGO: addon.with_logo,
})
data_addons.append(
{
ATTR_NAME: addon.name,
ATTR_SLUG: addon.slug,
ATTR_DESCRIPTON: addon.description,
ATTR_VERSION: addon.latest_version,
ATTR_INSTALLED: addon.version if addon.is_installed else None,
ATTR_AVAILABLE: addon.available,
ATTR_DETACHED: addon.is_detached,
ATTR_REPOSITORY: addon.repository,
ATTR_BUILD: addon.need_build,
ATTR_URL: addon.url,
ATTR_ICON: addon.with_icon,
ATTR_LOGO: addon.with_logo,
}
)
data_repositories = []
for repository in self.sys_store.all:
data_repositories.append({
ATTR_SLUG: repository.slug,
ATTR_NAME: repository.name,
ATTR_SOURCE: repository.source,
ATTR_URL: repository.url,
ATTR_MAINTAINER: repository.maintainer,
})
data_repositories.append(
{
ATTR_SLUG: repository.slug,
ATTR_NAME: repository.name,
ATTR_SOURCE: repository.source,
ATTR_URL: repository.url,
ATTR_MAINTAINER: repository.maintainer,
}
)
return {
ATTR_ADDONS: data_addons,
ATTR_REPOSITORIES: data_repositories,
}
return {ATTR_ADDONS: data_addons, ATTR_REPOSITORIES: data_repositories}
@api_process
async def reload(self, request: web.Request) -> None:
@ -234,19 +235,21 @@ class APIAddons(CoreSysAttributes):
}
if addon.is_installed:
data.update({
ATTR_STATE: await addon.state(),
ATTR_WEBUI: addon.webui,
ATTR_INGRESS_ENTRY: addon.ingress_entry,
ATTR_INGRESS_URL: addon.ingress_url,
ATTR_INGRESS_PORT: addon.ingress_port,
ATTR_INGRESS_PANEL: addon.ingress_panel,
ATTR_AUDIO_INPUT: addon.audio_input,
ATTR_AUDIO_OUTPUT: addon.audio_output,
ATTR_AUTO_UPDATE: addon.auto_update,
ATTR_IP_ADDRESS: str(addon.ip_address),
ATTR_VERSION: addon.version,
})
data.update(
{
ATTR_STATE: await addon.state(),
ATTR_WEBUI: addon.webui,
ATTR_INGRESS_ENTRY: addon.ingress_entry,
ATTR_INGRESS_URL: addon.ingress_url,
ATTR_INGRESS_PORT: addon.ingress_port,
ATTR_INGRESS_PANEL: addon.ingress_panel,
ATTR_AUDIO_INPUT: addon.audio_input,
ATTR_AUDIO_OUTPUT: addon.audio_output,
ATTR_AUTO_UPDATE: addon.auto_update,
ATTR_IP_ADDRESS: str(addon.ip_address),
ATTR_VERSION: addon.version,
}
)
return data
@ -255,9 +258,9 @@ class APIAddons(CoreSysAttributes):
"""Store user options for add-on."""
addon = self._extract_addon(request)
addon_schema = SCHEMA_OPTIONS.extend({
vol.Optional(ATTR_OPTIONS): vol.Any(None, addon.schema),
})
addon_schema = SCHEMA_OPTIONS.extend(
{vol.Optional(ATTR_OPTIONS): vol.Any(None, addon.schema)}
)
body = await api_validate(addon_schema, request)
if ATTR_OPTIONS in body:
@ -376,7 +379,7 @@ class APIAddons(CoreSysAttributes):
if not addon.with_icon:
raise APIError("No icon found!")
with addon.path_icon.open('rb') as png:
with addon.path_icon.open("rb") as png:
return png.read()
@api_process_raw(CONTENT_TYPE_PNG)
@ -386,7 +389,7 @@ class APIAddons(CoreSysAttributes):
if not addon.with_logo:
raise APIError("No logo found!")
with addon.path_logo.open('rb') as png:
with addon.path_logo.open("rb") as png:
return png.read()
@api_process_raw(CONTENT_TYPE_TEXT)
@ -396,7 +399,7 @@ class APIAddons(CoreSysAttributes):
if not addon.with_changelog:
raise APIError("No changelog found!")
with addon.path_changelog.open('r') as changelog:
with addon.path_changelog.open("r") as changelog:
return changelog.read()
@api_process
@ -415,7 +418,7 @@ def _pretty_devices(addon: AnyAddon) -> List[str]:
dev_list = addon.devices
if not dev_list:
return None
return [row.split(':')[0] for row in dev_list]
return [row.split(":")[0] for row in dev_list]
def _pretty_services(addon: AnyAddon) -> List[str]:

View File

@ -29,8 +29,8 @@ class APIAuth(CoreSysAttributes):
Return a coroutine.
"""
username = data.get('username') or data.get('user')
password = data.get('password')
username = data.get("username") or data.get("user")
password = data.get("password")
return self.sys_auth.check_login(addon, username, password)
@ -56,6 +56,6 @@ class APIAuth(CoreSysAttributes):
data = await request.post()
return await self._process_dict(request, addon, data)
raise HTTPUnauthorized(headers={
WWW_AUTHENTICATE: "Basic realm=\"Hass.io Authentication\""
})
raise HTTPUnauthorized(
headers={WWW_AUTHENTICATE: 'Basic realm="Hass.io Authentication"'}
)

View File

@ -3,7 +3,13 @@ import logging
from .utils import api_process
from ..const import (
ATTR_SERIAL, ATTR_DISK, ATTR_GPIO, ATTR_AUDIO, ATTR_INPUT, ATTR_OUTPUT)
ATTR_SERIAL,
ATTR_DISK,
ATTR_GPIO,
ATTR_AUDIO,
ATTR_INPUT,
ATTR_OUTPUT,
)
from ..coresys import CoreSysAttributes
_LOGGER = logging.getLogger(__name__)

View File

@ -6,18 +6,25 @@ import voluptuous as vol
from .utils import api_process, api_validate
from ..const import (
ATTR_HOSTNAME, ATTR_FEATURES, ATTR_KERNEL, ATTR_OPERATING_SYSTEM,
ATTR_CHASSIS, ATTR_DEPLOYMENT, ATTR_STATE, ATTR_NAME, ATTR_DESCRIPTON,
ATTR_SERVICES, ATTR_CPE)
ATTR_HOSTNAME,
ATTR_FEATURES,
ATTR_KERNEL,
ATTR_OPERATING_SYSTEM,
ATTR_CHASSIS,
ATTR_DEPLOYMENT,
ATTR_STATE,
ATTR_NAME,
ATTR_DESCRIPTON,
ATTR_SERVICES,
ATTR_CPE,
)
from ..coresys import CoreSysAttributes
_LOGGER = logging.getLogger(__name__)
SERVICE = 'service'
SERVICE = "service"
SCHEMA_OPTIONS = vol.Schema({
vol.Optional(ATTR_HOSTNAME): vol.Coerce(str),
})
SCHEMA_OPTIONS = vol.Schema({vol.Optional(ATTR_HOSTNAME): vol.Coerce(str)})
class APIHost(CoreSysAttributes):
@ -44,7 +51,8 @@ class APIHost(CoreSysAttributes):
# hostname
if ATTR_HOSTNAME in body:
await asyncio.shield(
self.sys_host.control.set_hostname(body[ATTR_HOSTNAME]))
self.sys_host.control.set_hostname(body[ATTR_HOSTNAME])
)
@api_process
def reboot(self, request):
@ -66,15 +74,15 @@ class APIHost(CoreSysAttributes):
"""Return list of available services."""
services = []
for unit in self.sys_host.services:
services.append({
ATTR_NAME: unit.name,
ATTR_DESCRIPTON: unit.description,
ATTR_STATE: unit.state,
})
services.append(
{
ATTR_NAME: unit.name,
ATTR_DESCRIPTON: unit.description,
ATTR_STATE: unit.state,
}
)
return {
ATTR_SERVICES: services
}
return {ATTR_SERVICES: services}
@api_process
def service_start(self, request):

View File

@ -12,8 +12,7 @@ import async_timeout
from ..const import HEADER_HA_ACCESS
from ..coresys import CoreSysAttributes
from ..exceptions import (
HomeAssistantAuthError, HomeAssistantAPIError, APIError)
from ..exceptions import HomeAssistantAuthError, HomeAssistantAPIError, APIError
_LOGGER = logging.getLogger(__name__)
@ -25,7 +24,7 @@ class APIProxy(CoreSysAttributes):
"""Check the Hass.io token."""
if AUTHORIZATION in request.headers:
bearer = request.headers[AUTHORIZATION]
hassio_token = bearer.split(' ')[-1]
hassio_token = bearer.split(" ")[-1]
else:
hassio_token = request.headers.get(HEADER_HA_ACCESS)
@ -54,10 +53,11 @@ class APIProxy(CoreSysAttributes):
content_type = None
async with self.sys_homeassistant.make_request(
request.method.lower(), f'api/{path}',
content_type=content_type,
data=data,
timeout=timeout,
request.method.lower(),
f"api/{path}",
content_type=content_type,
data=data,
timeout=timeout,
) as resp:
yield resp
return
@ -78,7 +78,7 @@ class APIProxy(CoreSysAttributes):
self._check_access(request)
_LOGGER.info("Home Assistant EventStream start")
async with self._api_client(request, 'stream', timeout=None) as client:
async with self._api_client(request, "stream", timeout=None) as client:
response = web.StreamResponse()
response.content_type = request.headers.get(CONTENT_TYPE)
try:
@ -97,13 +97,11 @@ class APIProxy(CoreSysAttributes):
self._check_access(request)
# Normal request
path = request.match_info.get('path', '')
path = request.match_info.get("path", "")
async with self._api_client(request, path) as client:
data = await client.read()
return web.Response(
body=data,
status=client.status,
content_type=client.content_type
body=data, status=client.status, content_type=client.content_type
)
async def _websocket_client(self):
@ -112,39 +110,46 @@ class APIProxy(CoreSysAttributes):
try:
client = await self.sys_websession_ssl.ws_connect(
url, heartbeat=30, verify_ssl=False)
url, heartbeat=30, verify_ssl=False
)
# Handle authentication
data = await client.receive_json()
if data.get('type') == 'auth_ok':
if data.get("type") == "auth_ok":
return client
if data.get('type') != 'auth_required':
if data.get("type") != "auth_required":
# Invalid protocol
_LOGGER.error(
"Got unexpected response from HA WebSocket: %s", data)
_LOGGER.error("Got unexpected response from HA WebSocket: %s", data)
raise APIError()
if self.sys_homeassistant.refresh_token:
await self.sys_homeassistant.ensure_access_token()
await client.send_json({
'type': 'auth',
'access_token': self.sys_homeassistant.access_token,
})
await client.send_json(
{
"type": "auth",
"access_token": self.sys_homeassistant.access_token,
}
)
else:
await client.send_json({
'type': 'auth',
'api_password': self.sys_homeassistant.api_password,
})
await client.send_json(
{
"type": "auth",
"api_password": self.sys_homeassistant.api_password,
}
)
data = await client.receive_json()
if data.get('type') == 'auth_ok':
if data.get("type") == "auth_ok":
return client
# Renew the Token is invalid
if data.get('type') == 'invalid_auth' and self.sys_homeassistant.refresh_token:
if (
data.get("type") == "invalid_auth"
and self.sys_homeassistant.refresh_token
):
self.sys_homeassistant.access_token = None
return await self._websocket_client()
@ -167,30 +172,27 @@ class APIProxy(CoreSysAttributes):
# handle authentication
try:
await server.send_json({
'type': 'auth_required',
'ha_version': self.sys_homeassistant.version,
})
await server.send_json(
{"type": "auth_required", "ha_version": self.sys_homeassistant.version}
)
# Check API access
response = await server.receive_json()
hassio_token = response.get('api_password') or response.get('access_token')
hassio_token = response.get("api_password") or response.get("access_token")
addon = self.sys_addons.from_token(hassio_token)
if not addon or not addon.access_homeassistant_api:
_LOGGER.warning("Unauthorized WebSocket access!")
await server.send_json({
'type': 'auth_invalid',
'message': 'Invalid access',
})
await server.send_json(
{"type": "auth_invalid", "message": "Invalid access"}
)
return server
_LOGGER.info("WebSocket access from %s", addon.slug)
await server.send_json({
'type': 'auth_ok',
'ha_version': self.sys_homeassistant.version,
})
await server.send_json(
{"type": "auth_ok", "ha_version": self.sys_homeassistant.version}
)
except (RuntimeError, ValueError) as err:
_LOGGER.error("Can't initialize handshake: %s", err)
return server
@ -207,16 +209,13 @@ class APIProxy(CoreSysAttributes):
server_read = None
while not server.closed and not client.closed:
if not client_read:
client_read = self.sys_create_task(
client.receive_str())
client_read = self.sys_create_task(client.receive_str())
if not server_read:
server_read = self.sys_create_task(
server.receive_str())
server_read = self.sys_create_task(server.receive_str())
# wait until data need to be processed
await asyncio.wait(
[client_read, server_read],
return_when=asyncio.FIRST_COMPLETED
[client_read, server_read], return_when=asyncio.FIRST_COMPLETED
)
# server

View File

@ -2,8 +2,13 @@
from .utils import api_process, api_validate
from ..const import (
ATTR_AVAILABLE, ATTR_PROVIDERS, ATTR_SLUG, ATTR_SERVICES, REQUEST_FROM,
PROVIDE_SERVICE)
ATTR_AVAILABLE,
ATTR_PROVIDERS,
ATTR_SLUG,
ATTR_SERVICES,
REQUEST_FROM,
PROVIDE_SERVICE,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
@ -13,7 +18,7 @@ class APIServices(CoreSysAttributes):
def _extract_service(self, request):
"""Return service, throw an exception if it doesn't exist."""
service = self.sys_services.get(request.match_info.get('service'))
service = self.sys_services.get(request.match_info.get("service"))
if not service:
raise APIError("Service does not exist")
@ -24,11 +29,13 @@ class APIServices(CoreSysAttributes):
"""Show register services."""
services = []
for service in self.sys_services.list_services:
services.append({
ATTR_SLUG: service.slug,
ATTR_AVAILABLE: service.enabled,
ATTR_PROVIDERS: service.providers,
})
services.append(
{
ATTR_SLUG: service.slug,
ATTR_AVAILABLE: service.enabled,
ATTR_PROVIDERS: service.providers,
}
)
return {ATTR_SERVICES: services}

View File

@ -10,9 +10,21 @@ import voluptuous as vol
from .utils import api_process, api_validate
from ..snapshots.validate import ALL_FOLDERS
from ..const import (
ATTR_NAME, ATTR_SLUG, ATTR_DATE, ATTR_ADDONS, ATTR_REPOSITORIES,
ATTR_HOMEASSISTANT, ATTR_VERSION, ATTR_SIZE, ATTR_FOLDERS, ATTR_TYPE,
ATTR_SNAPSHOTS, ATTR_PASSWORD, ATTR_PROTECTED, CONTENT_TYPE_TAR)
ATTR_NAME,
ATTR_SLUG,
ATTR_DATE,
ATTR_ADDONS,
ATTR_REPOSITORIES,
ATTR_HOMEASSISTANT,
ATTR_VERSION,
ATTR_SIZE,
ATTR_FOLDERS,
ATTR_TYPE,
ATTR_SNAPSHOTS,
ATTR_PASSWORD,
ATTR_PROTECTED,
CONTENT_TYPE_TAR,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
@ -20,30 +32,32 @@ _LOGGER = logging.getLogger(__name__)
# pylint: disable=no-value-for-parameter
SCHEMA_RESTORE_PARTIAL = vol.Schema({
vol.Optional(ATTR_PASSWORD): vol.Any(None, vol.Coerce(str)),
vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(),
vol.Optional(ATTR_ADDONS):
vol.All([vol.Coerce(str)], vol.Unique()),
vol.Optional(ATTR_FOLDERS):
vol.All([vol.In(ALL_FOLDERS)], vol.Unique()),
})
SCHEMA_RESTORE_PARTIAL = vol.Schema(
{
vol.Optional(ATTR_PASSWORD): vol.Any(None, vol.Coerce(str)),
vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(),
vol.Optional(ATTR_ADDONS): vol.All([vol.Coerce(str)], vol.Unique()),
vol.Optional(ATTR_FOLDERS): vol.All([vol.In(ALL_FOLDERS)], vol.Unique()),
}
)
SCHEMA_RESTORE_FULL = vol.Schema({
vol.Optional(ATTR_PASSWORD): vol.Any(None, vol.Coerce(str)),
})
SCHEMA_RESTORE_FULL = vol.Schema(
{vol.Optional(ATTR_PASSWORD): vol.Any(None, vol.Coerce(str))}
)
SCHEMA_SNAPSHOT_FULL = vol.Schema({
vol.Optional(ATTR_NAME): vol.Coerce(str),
vol.Optional(ATTR_PASSWORD): vol.Any(None, vol.Coerce(str)),
})
SCHEMA_SNAPSHOT_FULL = vol.Schema(
{
vol.Optional(ATTR_NAME): vol.Coerce(str),
vol.Optional(ATTR_PASSWORD): vol.Any(None, vol.Coerce(str)),
}
)
SCHEMA_SNAPSHOT_PARTIAL = SCHEMA_SNAPSHOT_FULL.extend({
vol.Optional(ATTR_ADDONS):
vol.All([vol.Coerce(str)], vol.Unique()),
vol.Optional(ATTR_FOLDERS):
vol.All([vol.In(ALL_FOLDERS)], vol.Unique()),
})
SCHEMA_SNAPSHOT_PARTIAL = SCHEMA_SNAPSHOT_FULL.extend(
{
vol.Optional(ATTR_ADDONS): vol.All([vol.Coerce(str)], vol.Unique()),
vol.Optional(ATTR_FOLDERS): vol.All([vol.In(ALL_FOLDERS)], vol.Unique()),
}
)
class APISnapshots(CoreSysAttributes):
@ -51,7 +65,7 @@ class APISnapshots(CoreSysAttributes):
def _extract_snapshot(self, request):
"""Return snapshot, throw an exception if it doesn't exist."""
snapshot = self.sys_snapshots.get(request.match_info.get('snapshot'))
snapshot = self.sys_snapshots.get(request.match_info.get("snapshot"))
if not snapshot:
raise APIError("Snapshot does not exist")
return snapshot
@ -61,17 +75,17 @@ class APISnapshots(CoreSysAttributes):
"""Return snapshot list."""
data_snapshots = []
for snapshot in self.sys_snapshots.list_snapshots:
data_snapshots.append({
ATTR_SLUG: snapshot.slug,
ATTR_NAME: snapshot.name,
ATTR_DATE: snapshot.date,
ATTR_TYPE: snapshot.sys_type,
ATTR_PROTECTED: snapshot.protected,
})
data_snapshots.append(
{
ATTR_SLUG: snapshot.slug,
ATTR_NAME: snapshot.name,
ATTR_DATE: snapshot.date,
ATTR_TYPE: snapshot.sys_type,
ATTR_PROTECTED: snapshot.protected,
}
)
return {
ATTR_SNAPSHOTS: data_snapshots,
}
return {ATTR_SNAPSHOTS: data_snapshots}
@api_process
async def reload(self, request):
@ -86,12 +100,14 @@ class APISnapshots(CoreSysAttributes):
data_addons = []
for addon_data in snapshot.addons:
data_addons.append({
ATTR_SLUG: addon_data[ATTR_SLUG],
ATTR_NAME: addon_data[ATTR_NAME],
ATTR_VERSION: addon_data[ATTR_VERSION],
ATTR_SIZE: addon_data[ATTR_SIZE],
})
data_addons.append(
{
ATTR_SLUG: addon_data[ATTR_SLUG],
ATTR_NAME: addon_data[ATTR_NAME],
ATTR_VERSION: addon_data[ATTR_VERSION],
ATTR_SIZE: addon_data[ATTR_SIZE],
}
)
return {
ATTR_SLUG: snapshot.slug,
@ -110,8 +126,7 @@ class APISnapshots(CoreSysAttributes):
async def snapshot_full(self, request):
"""Full-Snapshot a snapshot."""
body = await api_validate(SCHEMA_SNAPSHOT_FULL, request)
snapshot = await asyncio.shield(
self.sys_snapshots.do_snapshot_full(**body))
snapshot = await asyncio.shield(self.sys_snapshots.do_snapshot_full(**body))
if snapshot:
return {ATTR_SLUG: snapshot.slug}
@ -121,8 +136,7 @@ class APISnapshots(CoreSysAttributes):
async def snapshot_partial(self, request):
"""Partial-Snapshot a snapshot."""
body = await api_validate(SCHEMA_SNAPSHOT_PARTIAL, request)
snapshot = await asyncio.shield(
self.sys_snapshots.do_snapshot_partial(**body))
snapshot = await asyncio.shield(self.sys_snapshots.do_snapshot_partial(**body))
if snapshot:
return {ATTR_SLUG: snapshot.slug}
@ -135,7 +149,8 @@ class APISnapshots(CoreSysAttributes):
body = await api_validate(SCHEMA_RESTORE_FULL, request)
return await asyncio.shield(
self.sys_snapshots.do_restore_full(snapshot, **body))
self.sys_snapshots.do_restore_full(snapshot, **body)
)
@api_process
async def restore_partial(self, request):
@ -144,7 +159,8 @@ class APISnapshots(CoreSysAttributes):
body = await api_validate(SCHEMA_RESTORE_PARTIAL, request)
return await asyncio.shield(
self.sys_snapshots.do_restore_partial(snapshot, **body))
self.sys_snapshots.do_restore_partial(snapshot, **body)
)
@api_process
async def remove(self, request):
@ -168,7 +184,7 @@ class APISnapshots(CoreSysAttributes):
tar_file = Path(temp_dir, f"snapshot.tar")
try:
with tar_file.open('wb') as snapshot:
with tar_file.open("wb") as snapshot:
async for data in request.content.iter_any():
snapshot.write(data)
@ -180,7 +196,8 @@ class APISnapshots(CoreSysAttributes):
return False
snapshot = await asyncio.shield(
self.sys_snapshots.import_snapshot(tar_file))
self.sys_snapshots.import_snapshot(tar_file)
)
if snapshot:
return {ATTR_SLUG: snapshot.slug}

View File

@ -7,8 +7,13 @@ import voluptuous as vol
from voluptuous.humanize import humanize_error
from ..const import (
JSON_RESULT, JSON_DATA, JSON_MESSAGE, RESULT_OK, RESULT_ERROR,
CONTENT_TYPE_BINARY)
JSON_RESULT,
JSON_DATA,
JSON_MESSAGE,
RESULT_OK,
RESULT_ERROR,
CONTENT_TYPE_BINARY,
)
from ..exceptions import HassioError, APIError, APIForbidden
_LOGGER = logging.getLogger(__name__)
@ -26,6 +31,7 @@ def json_loads(data):
def api_process(method):
"""Wrap function with true/false calls to rest api."""
async def wrap_api(api, *args, **kwargs):
"""Return API information."""
try:
@ -48,8 +54,10 @@ def api_process(method):
def api_process_raw(content):
"""Wrap content_type into function."""
def wrap_method(method):
"""Wrap function with raw output to rest api."""
async def wrap_api(api, *args, **kwargs):
"""Return api information."""
try:
@ -59,29 +67,26 @@ def api_process_raw(content):
msg_data = str(err).encode()
msg_type = CONTENT_TYPE_BINARY
except HassioError:
msg_data = b''
msg_data = b""
msg_type = CONTENT_TYPE_BINARY
return web.Response(body=msg_data, content_type=msg_type)
return wrap_api
return wrap_method
def api_return_error(message=None):
"""Return an API error message."""
return web.json_response({
JSON_RESULT: RESULT_ERROR,
JSON_MESSAGE: message,
}, status=400)
return web.json_response(
{JSON_RESULT: RESULT_ERROR, JSON_MESSAGE: message}, status=400
)
def api_return_ok(data=None):
"""Return an API ok answer."""
return web.json_response({
JSON_RESULT: RESULT_OK,
JSON_DATA: data or {},
})
return web.json_response({JSON_RESULT: RESULT_OK, JSON_DATA: data or {}})
async def api_validate(schema, request):

View File

@ -2,8 +2,7 @@
import logging
import hashlib
from .const import (
FILE_HASSIO_AUTH, ATTR_PASSWORD, ATTR_USERNAME, ATTR_ADDON)
from .const import FILE_HASSIO_AUTH, ATTR_PASSWORD, ATTR_USERNAME, ATTR_ADDON
from .coresys import CoreSysAttributes
from .utils.json import JsonConfig
from .validate import SCHEMA_AUTH_CONFIG
@ -68,11 +67,14 @@ class Auth(JsonConfig, CoreSysAttributes):
try:
async with self.sys_homeassistant.make_request(
'post', 'api/hassio_auth', json={
ATTR_USERNAME: username,
ATTR_PASSWORD: password,
ATTR_ADDON: addon.slug,
}) as req:
"post",
"api/hassio_auth",
json={
ATTR_USERNAME: username,
ATTR_PASSWORD: password,
ATTR_ADDON: addon.slug,
},
) as req:
if req.status == 200:
_LOGGER.info("Success login from %s", username)

View File

@ -219,6 +219,6 @@ def supervisor_debugger(coresys: CoreSys) -> None:
_LOGGER.info("Initialize Hass.io debugger")
ptvsd.enable_attach(address=('0.0.0.0', 33333), redirect_output=True)
ptvsd.enable_attach(address=("0.0.0.0", 33333), redirect_output=True)
if coresys.config.debug_block:
ptvsd.wait_for_attach()

View File

@ -44,7 +44,8 @@ class CoreSys:
self._loop: asyncio.BaseEventLoop = asyncio.get_running_loop()
self._websession: aiohttp.ClientSession = aiohttp.ClientSession()
self._websession_ssl: aiohttp.ClientSession = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False))
connector=aiohttp.TCPConnector(ssl=False)
)
# Global objects
self._config: CoreConfig = CoreConfig()

View File

@ -8,8 +8,8 @@ from ..utils.gdbus import DBus
_LOGGER = logging.getLogger(__name__)
DBUS_NAME = 'org.freedesktop.hostname1'
DBUS_OBJECT = '/org/freedesktop/hostname1'
DBUS_NAME = "org.freedesktop.hostname1"
DBUS_OBJECT = "/org/freedesktop/hostname1"
class Hostname(DBusInterface):

View File

@ -8,8 +8,8 @@ from ..utils.gdbus import DBus
_LOGGER = logging.getLogger(__name__)
DBUS_NAME = 'de.pengutronix.rauc'
DBUS_OBJECT = '/'
DBUS_NAME = "de.pengutronix.rauc"
DBUS_OBJECT = "/"
class Rauc(DBusInterface):

View File

@ -8,8 +8,8 @@ from ..utils.gdbus import DBus
_LOGGER = logging.getLogger(__name__)
DBUS_NAME = 'org.freedesktop.systemd1'
DBUS_OBJECT = '/org/freedesktop/systemd1'
DBUS_NAME = "org.freedesktop.systemd1"
DBUS_OBJECT = "/org/freedesktop/systemd1"
class Systemd(DBusInterface):

View File

@ -5,6 +5,7 @@ from ..exceptions import DBusNotConnectedError
def dbus_connected(method):
"""Wrapper for check if D-Bus is connected."""
def wrap_dbus(api, *args, **kwargs):
"""Check if D-Bus is connected before call a method."""
if api.dbus is None:

View File

@ -59,7 +59,8 @@ class DockerAddon(DockerInterface):
# Extract IP-Address
try:
return ip_address(
self._meta["NetworkSettings"]["Networks"]["hassio"]["IPAddress"])
self._meta["NetworkSettings"]["Networks"]["hassio"]["IPAddress"]
)
except (KeyError, TypeError, ValueError):
return ip_address("0.0.0.0")

View File

@ -78,7 +78,7 @@ class DockerInterface(CoreSysAttributes):
Need run inside executor.
"""
image = image or self.image
image = image.partition(':')[0] # remove potential tag
image = image.partition(":")[0] # remove potential tag
try:
_LOGGER.info("Pull image %s tag %s.", image, tag)

View File

@ -14,8 +14,8 @@ class DockerStats:
self._blk_write = 0
try:
self._memory_usage = stats['memory_stats']['usage']
self._memory_limit = stats['memory_stats']['limit']
self._memory_usage = stats["memory_stats"]["usage"]
self._memory_limit = stats["memory_stats"]["limit"]
except KeyError:
self._memory_usage = 0
self._memory_limit = 0
@ -24,35 +24,42 @@ class DockerStats:
self._calc_cpu_percent(stats)
with suppress(KeyError):
self._calc_network(stats['networks'])
self._calc_network(stats["networks"])
with suppress(KeyError):
self._calc_block_io(stats['blkio_stats'])
self._calc_block_io(stats["blkio_stats"])
def _calc_cpu_percent(self, stats):
"""Calculate CPU percent."""
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
cpu_delta = (
stats["cpu_stats"]["cpu_usage"]["total_usage"]
- stats["precpu_stats"]["cpu_usage"]["total_usage"]
)
system_delta = (
stats["cpu_stats"]["system_cpu_usage"]
- stats["precpu_stats"]["system_cpu_usage"]
)
if system_delta > 0.0 and cpu_delta > 0.0:
self._cpu = (cpu_delta / system_delta) * \
len(stats['cpu_stats']['cpu_usage']['percpu_usage']) * 100.0
self._cpu = (
(cpu_delta / system_delta)
* len(stats["cpu_stats"]["cpu_usage"]["percpu_usage"])
* 100.0
)
def _calc_network(self, networks):
"""Calculate Network IO stats."""
for _, stats in networks.items():
self._network_rx += stats['rx_bytes']
self._network_tx += stats['tx_bytes']
self._network_rx += stats["rx_bytes"]
self._network_tx += stats["tx_bytes"]
def _calc_block_io(self, blkio):
"""Calculate block IO stats."""
for stats in blkio['io_service_bytes_recursive']:
if stats['op'] == 'Read':
self._blk_read += stats['value']
elif stats['op'] == 'Write':
self._blk_write += stats['value']
for stats in blkio["io_service_bytes_recursive"]:
if stats["op"] == "Read":
self._blk_read += stats["value"]
elif stats["op"] == "Write":
self._blk_write += stats["value"]
@property
def cpu_percent(self):

View File

@ -54,6 +54,7 @@ RE_YAML_ERROR = re.compile(r"homeassistant\.util\.yaml")
@attr.s(frozen=True)
class ConfigResult:
"""Return object from config check."""
valid = attr.ib()
log = attr.ib()
@ -135,8 +136,9 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
@property
def api_url(self) -> str:
"""Return API url to Home Assistant."""
return "{}://{}:{}".format('https' if self.api_ssl else 'http',
self.ip_address, self.api_port)
return "{}://{}:{}".format(
"https" if self.api_ssl else "http", self.ip_address, self.api_port
)
@property
def watchdog(self) -> bool:
@ -183,7 +185,7 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
"""Return image name of the Home Assistant container."""
if self._data.get(ATTR_IMAGE):
return self._data[ATTR_IMAGE]
return os.environ['HOMEASSISTANT_REPOSITORY']
return os.environ["HOMEASSISTANT_REPOSITORY"]
@image.setter
def image(self, value: str):
@ -196,8 +198,7 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
@property
def is_custom_image(self) -> bool:
"""Return True if a custom image is used."""
return all(
attr in self._data for attr in (ATTR_IMAGE, ATTR_LAST_VERSION))
return all(attr in self._data for attr in (ATTR_IMAGE, ATTR_LAST_VERSION))
@property
def boot(self) -> bool:
@ -235,7 +236,7 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
_LOGGER.info("Setup HomeAssistant landingpage")
while True:
with suppress(DockerAPIError):
await self.instance.install('landingpage')
await self.instance.install("landingpage")
return
_LOGGER.warning("Fails install landingpage, retry after 30sec")
await asyncio.sleep(30)
@ -407,7 +408,8 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
async def check_config(self) -> ConfigResult:
"""Run Home Assistant config check."""
result = await self.instance.execute_command(
"python3 -m homeassistant -c /config --script check_config")
"python3 -m homeassistant -c /config --script check_config"
)
# if not valid
if result.exit_code is None:
@ -425,35 +427,42 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
async def ensure_access_token(self) -> None:
"""Ensures there is an access token."""
if self.access_token is not None and self._access_token_expires > datetime.utcnow():
if (
self.access_token is not None
and self._access_token_expires > datetime.utcnow()
):
return
with suppress(asyncio.TimeoutError, aiohttp.ClientError):
async with self.sys_websession_ssl.post(
f"{self.api_url}/auth/token",
timeout=30,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token
}) as resp:
f"{self.api_url}/auth/token",
timeout=30,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
},
) as resp:
if resp.status != 200:
_LOGGER.error("Can't update Home Assistant access token!")
raise HomeAssistantAuthError()
_LOGGER.info("Updated Home Assistant API token")
tokens = await resp.json()
self.access_token = tokens['access_token']
self._access_token_expires = \
datetime.utcnow() + timedelta(seconds=tokens['expires_in'])
self.access_token = tokens["access_token"]
self._access_token_expires = datetime.utcnow() + timedelta(
seconds=tokens["expires_in"]
)
@asynccontextmanager
async def make_request(self,
method: str,
path: str,
json: Optional[Dict[str, Any]] = None,
content_type: Optional[str] = None,
data: Optional[bytes] = None,
timeout=30) -> AsyncContextManager[aiohttp.ClientResponse]:
async def make_request(
self,
method: str,
path: str,
json: Optional[Dict[str, Any]] = None,
content_type: Optional[str] = None,
data: Optional[bytes] = None,
timeout=30,
) -> AsyncContextManager[aiohttp.ClientResponse]:
"""Async context manager to make a request with right auth."""
url = f"{self.api_url}/{path}"
headers = {}
@ -470,12 +479,12 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
# Prepare Access token
if self.refresh_token:
await self.ensure_access_token()
headers[hdrs.AUTHORIZATION] = f'Bearer {self.access_token}'
headers[hdrs.AUTHORIZATION] = f"Bearer {self.access_token}"
try:
async with getattr(self.sys_websession_ssl, method)(
url, data=data, timeout=timeout, json=json,
headers=headers) as resp:
url, data=data, timeout=timeout, json=json, headers=headers
) as resp:
# Access token expired
if resp.status == 401 and self.refresh_token:
self.access_token = None
@ -491,7 +500,7 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
async def check_api_state(self) -> bool:
"""Return True if Home Assistant up and running."""
with suppress(HomeAssistantAPIError):
async with self.make_request('get', 'api/') as resp:
async with self.make_request("get", "api/") as resp:
if resp.status in (200, 201):
return True
status = resp.status
@ -503,8 +512,7 @@ class HomeAssistant(JsonConfig, CoreSysAttributes):
"""Block until Home-Assistant is booting up or startup timeout."""
start_time = time.monotonic()
migration_progress = False
migration_file = Path(self.sys_config.path_homeassistant,
'.migration_progress')
migration_file = Path(self.sys_config.path_homeassistant, ".migration_progress")
def check_port():
"""Check if port is mapped."""

View File

@ -8,8 +8,12 @@ from .control import SystemControl
from .info import InfoCenter
from .services import ServiceManager
from ..const import (
FEATURES_REBOOT, FEATURES_SHUTDOWN, FEATURES_HOSTNAME, FEATURES_SERVICES,
FEATURES_HASSOS)
FEATURES_REBOOT,
FEATURES_SHUTDOWN,
FEATURES_HOSTNAME,
FEATURES_SERVICES,
FEATURES_HASSOS,
)
from ..coresys import CoreSysAttributes
from ..exceptions import HassioError
@ -59,11 +63,7 @@ class HostManager(CoreSysAttributes):
features = []
if self.sys_dbus.systemd.is_connected:
features.extend([
FEATURES_REBOOT,
FEATURES_SHUTDOWN,
FEATURES_SERVICES,
])
features.extend([FEATURES_REBOOT, FEATURES_SHUTDOWN, FEATURES_SERVICES])
if self.sys_dbus.hostname.is_connected:
features.append(FEATURES_HOSTNAME)

View File

@ -6,14 +6,13 @@ from string import Template
import attr
from ..const import (
ATTR_INPUT, ATTR_OUTPUT, ATTR_DEVICES, ATTR_NAME, CHAN_ID, CHAN_TYPE)
from ..const import ATTR_INPUT, ATTR_OUTPUT, ATTR_DEVICES, ATTR_NAME, CHAN_ID, CHAN_TYPE
from ..coresys import CoreSysAttributes
_LOGGER = logging.getLogger(__name__)
# pylint: disable=invalid-name
DefaultConfig = attr.make_class('DefaultConfig', ['input', 'output'])
DefaultConfig = attr.make_class("DefaultConfig", ["input", "output"])
class AlsaAudio(CoreSysAttributes):
@ -22,10 +21,7 @@ class AlsaAudio(CoreSysAttributes):
def __init__(self, coresys):
"""Initialize ALSA audio system."""
self.coresys = coresys
self._data = {
ATTR_INPUT: {},
ATTR_OUTPUT: {},
}
self._data = {ATTR_INPUT: {}, ATTR_OUTPUT: {}}
self._cache = 0
self._default = None
@ -66,18 +62,20 @@ class AlsaAudio(CoreSysAttributes):
dev_name = dev_data[ATTR_NAME]
# Lookup type
if chan_type.endswith('playback'):
if chan_type.endswith("playback"):
key = ATTR_OUTPUT
elif chan_type.endswith('capture'):
elif chan_type.endswith("capture"):
key = ATTR_INPUT
else:
_LOGGER.warning("Unknown channel type: %s", chan_type)
continue
# Use name from DB or a generic name
self._data[key][alsa_id] = database.get(
self.sys_machine, {}).get(
dev_name, {}).get(alsa_id, f"{dev_name}: {chan_id}")
self._data[key][alsa_id] = (
database.get(self.sys_machine, {})
.get(dev_name, {})
.get(alsa_id, f"{dev_name}: {chan_id}")
)
self._cache = current_id
@ -88,7 +86,7 @@ class AlsaAudio(CoreSysAttributes):
try:
# pylint: disable=no-member
with json_file.open('r') as database:
with json_file.open("r") as database:
return json.loads(database.read())
except (ValueError, OSError) as err:
_LOGGER.warning("Can't read audio DB: %s", err)
@ -127,7 +125,7 @@ class AlsaAudio(CoreSysAttributes):
asound_file = Path(__file__).parent.joinpath("data/asound.tmpl")
try:
# pylint: disable=no-member
with asound_file.open('r') as asound:
with asound_file.open("r") as asound:
asound_data = asound.read()
except OSError as err:
_LOGGER.error("Can't read asound.tmpl: %s", err)
@ -135,6 +133,4 @@ class AlsaAudio(CoreSysAttributes):
# Process Template
asound_template = Template(asound_data)
return asound_template.safe_substitute(
input=alsa_input, output=alsa_output
)
return asound_template.safe_substitute(input=alsa_input, output=alsa_output)

View File

@ -9,7 +9,7 @@ from ..utils.apparmor import validate_profile
_LOGGER = logging.getLogger(__name__)
SYSTEMD_SERVICES = {'hassos-apparmor.service', 'hassio-apparmor.service'}
SYSTEMD_SERVICES = {"hassos-apparmor.service", "hassio-apparmor.service"}
class AppArmorControl(CoreSysAttributes):
@ -98,8 +98,7 @@ class AppArmorControl(CoreSysAttributes):
return
# Marks als remove and start host process
remove_profile = Path(
self.sys_config.path_apparmor, 'remove', profile_name)
remove_profile = Path(self.sys_config.path_apparmor, "remove", profile_name)
try:
profile_file.rename(remove_profile)
except OSError as err:

View File

@ -6,8 +6,8 @@ from ..exceptions import HostNotSupportedError
_LOGGER = logging.getLogger(__name__)
MANAGER = 'manager'
HOSTNAME = 'hostname'
MANAGER = "manager"
HOSTNAME = "hostname"
class SystemControl(CoreSysAttributes):

View File

@ -18,32 +18,32 @@ class InfoCenter(CoreSysAttributes):
@property
def hostname(self):
"""Return local hostname."""
return self._data.get('StaticHostname') or None
return self._data.get("StaticHostname") or None
@property
def chassis(self):
"""Return local chassis type."""
return self._data.get('Chassis') or None
return self._data.get("Chassis") or None
@property
def deployment(self):
"""Return local deployment type."""
return self._data.get('Deployment') or None
return self._data.get("Deployment") or None
@property
def kernel(self):
"""Return local kernel version."""
return self._data.get('KernelRelease') or None
return self._data.get("KernelRelease") or None
@property
def operating_system(self):
"""Return local operating system."""
return self._data.get('OperatingSystemPrettyName') or None
return self._data.get("OperatingSystemPrettyName") or None
@property
def cpe(self):
"""Return local CPE."""
return self._data.get('OperatingSystemCPEName') or None
return self._data.get("OperatingSystemCPEName") or None
async def update(self):
"""Update properties over dbus."""

View File

@ -8,7 +8,7 @@ from ..exceptions import HassioError, HostNotSupportedError, HostServiceError
_LOGGER = logging.getLogger(__name__)
MOD_REPLACE = 'replace'
MOD_REPLACE = "replace"
class ServiceManager(CoreSysAttributes):
@ -77,8 +77,10 @@ class ServiceManager(CoreSysAttributes):
try:
systemd_units = await self.sys_dbus.systemd.list_units()
for service_data in systemd_units[0]:
if not service_data[0].endswith(".service") or \
service_data[2] != 'loaded':
if (
not service_data[0].endswith(".service")
or service_data[2] != "loaded"
):
continue
self._services.add(ServiceInfo.read_from(service_data))
except (HassioError, IndexError):

View File

@ -130,8 +130,12 @@ class Ingress(JsonConfig, CoreSysAttributes):
async def update_hass_panel(self, addon: Addon):
"""Return True if Home Assistant up and running."""
method = "post" if addon.ingress_panel else "delete"
async with self.sys_homeassistant.make_request(method, f"api/hassio_push/panel/{addon.slug}") as resp:
async with self.sys_homeassistant.make_request(
method, f"api/hassio_push/panel/{addon.slug}"
) as resp:
if resp.status in (200, 201):
_LOGGER.info("Update Ingress as panel for %s", addon.slug)
else:
_LOGGER.warning("Fails Ingress panel for %s with %i", addon.slug, resp.status)
_LOGGER.warning(
"Fails Ingress panel for %s with %i", addon.slug, resp.status
)

View File

@ -24,7 +24,8 @@ class DNSForward:
*shlex.split(COMMAND),
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL)
stderr=asyncio.subprocess.DEVNULL
)
except OSError as err:
_LOGGER.error("Can't start DNS forwarding: %s", err)
else:

View File

@ -35,8 +35,8 @@ class Hardware:
def serial_devices(self):
"""Return all serial and connected devices."""
dev_list = set()
for device in self.context.list_devices(subsystem='tty'):
if 'ID_VENDOR' in device or RE_TTY.search(device.device_node):
for device in self.context.list_devices(subsystem="tty"):
if "ID_VENDOR" in device or RE_TTY.search(device.device_node):
dev_list.add(device.device_node)
return dev_list
@ -45,9 +45,9 @@ class Hardware:
def input_devices(self):
"""Return all input devices."""
dev_list = set()
for device in self.context.list_devices(subsystem='input'):
if 'NAME' in device:
dev_list.add(device['NAME'].replace('"', ''))
for device in self.context.list_devices(subsystem="input"):
if "NAME" in device:
dev_list.add(device["NAME"].replace('"', ""))
return dev_list
@ -55,8 +55,8 @@ class Hardware:
def disk_devices(self):
"""Return all disk devices."""
dev_list = set()
for device in self.context.list_devices(subsystem='block'):
if device.device_node.startswith('/dev/sd'):
for device in self.context.list_devices(subsystem="block"):
if device.device_node.startswith("/dev/sd"):
dev_list.add(device.device_node)
return dev_list
@ -93,10 +93,9 @@ class Hardware:
# parse devices
for match in RE_DEVICES.finditer(devices):
try:
audio_list[match.group(1)][ATTR_DEVICES].append({
CHAN_ID: match.group(2),
CHAN_TYPE: match.group(3)
})
audio_list[match.group(1)][ATTR_DEVICES].append(
{CHAN_ID: match.group(2), CHAN_TYPE: match.group(3)}
)
except KeyError:
_LOGGER.warning("Wrong audio device found %s", match.group(0))
continue

View File

@ -5,10 +5,10 @@ import logging
_LOGGER = logging.getLogger(__name__)
INTERVAL = 'interval'
REPEAT = 'repeat'
CALL = 'callback'
TASK = 'task'
INTERVAL = "interval"
REPEAT = "repeat"
CALL = "callback"
TASK = "task"
class Scheduler:
@ -28,11 +28,7 @@ class Scheduler:
task_id = hash(coro_callback)
# Generate data
opts = {
CALL: coro_callback,
INTERVAL: interval,
REPEAT: repeat,
}
opts = {CALL: coro_callback, INTERVAL: interval, REPEAT: repeat}
# Schedule task
self._data[task_id] = opts
@ -58,8 +54,7 @@ class Scheduler:
job = self.loop.call_later(interval, self._run_task, task_id)
elif isinstance(interval, time):
today = datetime.combine(date.today(), interval)
tomorrow = datetime.combine(date.today() + timedelta(days=1),
interval)
tomorrow = datetime.combine(date.today() + timedelta(days=1), interval)
# Check if we run it today or next day
if today > datetime.today():
@ -69,8 +64,12 @@ class Scheduler:
job = self.loop.call_at(calc.timestamp(), self._run_task, task_id)
else:
_LOGGER.fatal("Unknown interval %s (type: %s) for scheduler %s",
interval, type(interval), task_id)
_LOGGER.fatal(
"Unknown interval %s (type: %s) for scheduler %s",
interval,
type(interval),
task_id,
)
# Store job
self._data[task_id][TASK] = job

View File

@ -5,8 +5,7 @@ from pathlib import Path
from .snapshot import Snapshot
from .utils import create_slug
from ..const import (
FOLDER_HOMEASSISTANT, SNAPSHOT_FULL, SNAPSHOT_PARTIAL)
from ..const import FOLDER_HOMEASSISTANT, SNAPSHOT_FULL, SNAPSHOT_PARTIAL
from ..coresys import CoreSysAttributes
from ..utils.dt import utcnow
@ -64,8 +63,10 @@ class SnapshotManager(CoreSysAttributes):
if await snapshot.load():
self.snapshots_obj[snapshot.slug] = snapshot
tasks = [_load_snapshot(tar_file) for tar_file in
self.sys_config.path_backup.glob("*.tar")]
tasks = [
_load_snapshot(tar_file)
for tar_file in self.sys_config.path_backup.glob("*.tar")
]
_LOGGER.info("Found %d snapshot files", len(tasks))
if tasks:
@ -149,8 +150,9 @@ class SnapshotManager(CoreSysAttributes):
self.sys_scheduler.suspend = False
self.lock.release()
async def do_snapshot_partial(self, name="", addons=None, folders=None,
password=None):
async def do_snapshot_partial(
self, name="", addons=None, folders=None, password=None
):
"""Create a partial snapshot."""
if self.lock.locked():
_LOGGER.error("A snapshot/restore process is already running")
@ -173,8 +175,7 @@ class SnapshotManager(CoreSysAttributes):
if addon and addon.is_installed:
addon_list.append(addon)
continue
_LOGGER.warning(
"Add-on %s not found/installed", addon_slug)
_LOGGER.warning("Add-on %s not found/installed", addon_slug)
if addon_list:
_LOGGER.info("Snapshot %s store Add-ons", snapshot.slug)
@ -205,8 +206,7 @@ class SnapshotManager(CoreSysAttributes):
return False
if snapshot.sys_type != SNAPSHOT_FULL:
_LOGGER.error("Restore %s is only a partial snapshot!",
snapshot.slug)
_LOGGER.error("Restore %s is only a partial snapshot!", snapshot.slug)
return False
if snapshot.protected and not snapshot.set_password(password):
@ -231,8 +231,9 @@ class SnapshotManager(CoreSysAttributes):
# Start homeassistant restore
_LOGGER.info("Restore %s run Home-Assistant", snapshot.slug)
snapshot.restore_homeassistant()
task_hass = self.sys_create_task(self.sys_homeassistant.update(
snapshot.homeassistant_version))
task_hass = self.sys_create_task(
self.sys_homeassistant.update(snapshot.homeassistant_version)
)
# Restore repositories
_LOGGER.info("Restore %s run Repositories", snapshot.slug)
@ -253,8 +254,7 @@ class SnapshotManager(CoreSysAttributes):
await snapshot.restore_addons()
# finish homeassistant task
_LOGGER.info("Restore %s wait until homeassistant ready",
snapshot.slug)
_LOGGER.info("Restore %s wait until homeassistant ready", snapshot.slug)
await task_hass
await self.sys_homeassistant.start()
@ -270,8 +270,9 @@ class SnapshotManager(CoreSysAttributes):
self.sys_scheduler.suspend = False
self.lock.release()
async def do_restore_partial(self, snapshot, homeassistant=False,
addons=None, folders=None, password=None):
async def do_restore_partial(
self, snapshot, homeassistant=False, addons=None, folders=None, password=None
):
"""Restore a snapshot."""
if self.lock.locked():
_LOGGER.error("A snapshot/restore process is already running")
@ -303,11 +304,10 @@ class SnapshotManager(CoreSysAttributes):
# Process Home-Assistant
task_hass = None
if homeassistant:
_LOGGER.info("Restore %s run Home-Assistant",
snapshot.slug)
_LOGGER.info("Restore %s run Home-Assistant", snapshot.slug)
task_hass = self.sys_create_task(
self.sys_homeassistant.update(
snapshot.homeassistant_version))
self.sys_homeassistant.update(snapshot.homeassistant_version)
)
if addons:
_LOGGER.info("Restore %s old add-ons", snapshot.slug)
@ -315,8 +315,7 @@ class SnapshotManager(CoreSysAttributes):
# Make sure homeassistant run agen
if task_hass:
_LOGGER.info("Restore %s wait for Home-Assistant",
snapshot.slug)
_LOGGER.info("Restore %s wait for Home-Assistant", snapshot.slug)
await task_hass
# Do we need start HomeAssistant?

View File

@ -213,8 +213,7 @@ class Snapshot(CoreSysAttributes):
try:
raw = await self.sys_run_in_executor(_load_file)
except (tarfile.TarError, KeyError) as err:
_LOGGER.error(
"Can't read snapshot tarfile %s: %s", self.tarfile, err)
_LOGGER.error("Can't read snapshot tarfile %s: %s", self.tarfile, err)
return False
# parse data
@ -228,8 +227,11 @@ class Snapshot(CoreSysAttributes):
try:
self._data = SCHEMA_SNAPSHOT(raw_dict)
except vol.Invalid as err:
_LOGGER.error("Can't validate data for %s: %s", self.tarfile,
humanize_error(raw_dict, err))
_LOGGER.error(
"Can't validate data for %s: %s",
self.tarfile,
humanize_error(raw_dict, err),
)
return False
return True
@ -261,8 +263,9 @@ class Snapshot(CoreSysAttributes):
try:
self._data = SCHEMA_SNAPSHOT(self._data)
except vol.Invalid as err:
_LOGGER.error("Invalid data for %s: %s", self.tarfile,
humanize_error(self._data, err))
_LOGGER.error(
"Invalid data for %s: %s", self.tarfile, humanize_error(self._data, err)
)
raise ValueError("Invalid config") from None
# new snapshot, build it
@ -286,8 +289,8 @@ class Snapshot(CoreSysAttributes):
async def _addon_save(addon):
"""Task to store an add-on into snapshot."""
addon_file = SecureTarFile(
Path(self._tmp.name, f"{addon.slug}.tar.gz"),
'w', key=self._key)
Path(self._tmp.name, f"{addon.slug}.tar.gz"), "w", key=self._key
)
# Take snapshot
try:
@ -297,12 +300,14 @@ class Snapshot(CoreSysAttributes):
return
# Store to config
self._data[ATTR_ADDONS].append({
ATTR_SLUG: addon.slug,
ATTR_NAME: addon.name,
ATTR_VERSION: addon.version,
ATTR_SIZE: addon_file.size,
})
self._data[ATTR_ADDONS].append(
{
ATTR_SLUG: addon.slug,
ATTR_NAME: addon.name,
ATTR_VERSION: addon.version,
ATTR_SIZE: addon_file.size,
}
)
# Run tasks
tasks = [_addon_save(addon) for addon in addon_list]
@ -316,7 +321,8 @@ class Snapshot(CoreSysAttributes):
async def _addon_restore(addon_slug):
"""Task to restore an add-on into snapshot."""
addon_file = SecureTarFile(
Path(self._tmp.name, f"{addon_slug}.tar.gz"), 'r', key=self._key)
Path(self._tmp.name, f"{addon_slug}.tar.gz"), "r", key=self._key
)
# If exists inside snapshot
if not addon_file.path.exists():
@ -352,7 +358,7 @@ class Snapshot(CoreSysAttributes):
# Take snapshot
try:
_LOGGER.info("Snapshot folder %s", name)
with SecureTarFile(tar_name, 'w', key=self._key) as tar_file:
with SecureTarFile(tar_name, "w", key=self._key) as tar_file:
tar_file.add(origin_dir, arcname=".")
_LOGGER.info("Snapshot folder %s done", name)
@ -361,8 +367,9 @@ class Snapshot(CoreSysAttributes):
_LOGGER.warning("Can't snapshot folder %s: %s", name, err)
# Run tasks
tasks = [self.sys_run_in_executor(_folder_save, folder)
for folder in folder_list]
tasks = [
self.sys_run_in_executor(_folder_save, folder) for folder in folder_list
]
if tasks:
await asyncio.wait(tasks)
@ -388,15 +395,16 @@ class Snapshot(CoreSysAttributes):
# Perform a restore
try:
_LOGGER.info("Restore folder %s", name)
with SecureTarFile(tar_name, 'r', key=self._key) as tar_file:
with SecureTarFile(tar_name, "r", key=self._key) as tar_file:
tar_file.extractall(path=origin_dir)
_LOGGER.info("Restore folder %s done", name)
except (tarfile.TarError, OSError) as err:
_LOGGER.warning("Can't restore folder %s: %s", name, err)
# Run tasks
tasks = [self.sys_run_in_executor(_folder_restore, folder)
for folder in folder_list]
tasks = [
self.sys_run_in_executor(_folder_restore, folder) for folder in folder_list
]
if tasks:
await asyncio.wait(tasks)
@ -410,16 +418,19 @@ class Snapshot(CoreSysAttributes):
# Custom image
if self.sys_homeassistant.is_custom_image:
self.homeassistant[ATTR_IMAGE] = self.sys_homeassistant.image
self.homeassistant[ATTR_LAST_VERSION] = \
self.sys_homeassistant.latest_version
self.homeassistant[
ATTR_LAST_VERSION
] = self.sys_homeassistant.latest_version
# API/Proxy
self.homeassistant[ATTR_PORT] = self.sys_homeassistant.api_port
self.homeassistant[ATTR_SSL] = self.sys_homeassistant.api_ssl
self.homeassistant[ATTR_REFRESH_TOKEN] = \
self._encrypt_data(self.sys_homeassistant.refresh_token)
self.homeassistant[ATTR_PASSWORD] = \
self._encrypt_data(self.sys_homeassistant.api_password)
self.homeassistant[ATTR_REFRESH_TOKEN] = self._encrypt_data(
self.sys_homeassistant.refresh_token
)
self.homeassistant[ATTR_PASSWORD] = self._encrypt_data(
self.sys_homeassistant.api_password
)
def restore_homeassistant(self):
"""Write all data to the Home Assistant object."""
@ -430,16 +441,19 @@ class Snapshot(CoreSysAttributes):
# Custom image
if self.homeassistant.get(ATTR_IMAGE):
self.sys_homeassistant.image = self.homeassistant[ATTR_IMAGE]
self.sys_homeassistant.latest_version = \
self.homeassistant[ATTR_LAST_VERSION]
self.sys_homeassistant.latest_version = self.homeassistant[
ATTR_LAST_VERSION
]
# API/Proxy
self.sys_homeassistant.api_port = self.homeassistant[ATTR_PORT]
self.sys_homeassistant.api_ssl = self.homeassistant[ATTR_SSL]
self.sys_homeassistant.refresh_token = \
self._decrypt_data(self.homeassistant[ATTR_REFRESH_TOKEN])
self.sys_homeassistant.api_password = \
self._decrypt_data(self.homeassistant[ATTR_PASSWORD])
self.sys_homeassistant.refresh_token = self._decrypt_data(
self.homeassistant[ATTR_REFRESH_TOKEN]
)
self.sys_homeassistant.api_password = self._decrypt_data(
self.homeassistant[ATTR_PASSWORD]
)
# save
self.sys_homeassistant.save_data()

View File

@ -33,16 +33,14 @@ class StoreManager(CoreSysAttributes):
self.data.update()
# Init Hass.io built-in repositories
repositories = \
set(self.sys_config.addons_repositories) | BUILTIN_REPOSITORIES
repositories = set(self.sys_config.addons_repositories) | BUILTIN_REPOSITORIES
# Init custom repositories and load add-ons
await self.update_repositories(repositories)
async def reload(self) -> None:
"""Update add-ons from repository and reload list."""
tasks = [repository.update() for repository in
self.repositories.values()]
tasks = [repository.update() for repository in self.repositories.values()]
if tasks:
await asyncio.wait(tasks)
@ -89,8 +87,12 @@ class StoreManager(CoreSysAttributes):
add_addons = all_addons - set(self.sys_addons.store)
del_addons = set(self.sys_addons.store) - all_addons
_LOGGER.info("Load add-ons from store: %d all - %d new - %d remove",
len(all_addons), len(add_addons), len(del_addons))
_LOGGER.info(
"Load add-ons from store: %d all - %d new - %d remove",
len(all_addons),
len(add_addons),
len(del_addons),
)
# new addons
for slug in add_addons:

View File

@ -45,11 +45,13 @@ class GitRepo(CoreSysAttributes):
async with self.lock:
try:
_LOGGER.info("Load add-on %s repository", self.path)
self.repo = await self.sys_run_in_executor(
git.Repo, str(self.path))
self.repo = await self.sys_run_in_executor(git.Repo, str(self.path))
except (git.InvalidGitRepositoryError, git.NoSuchPathError,
git.GitCommandError) as err:
except (
git.InvalidGitRepositoryError,
git.NoSuchPathError,
git.GitCommandError,
) as err:
_LOGGER.error("Can't load %s repo: %s.", self.path, err)
self._remove()
return False
@ -62,22 +64,27 @@ class GitRepo(CoreSysAttributes):
git_args = {
attribute: value
for attribute, value in (
('recursive', True),
('branch', self.branch),
('depth', 1),
('shallow-submodules', True)
) if value is not None
("recursive", True),
("branch", self.branch),
("depth", 1),
("shallow-submodules", True),
)
if value is not None
}
try:
_LOGGER.info("Clone add-on %s repository", self.url)
self.repo = await self.sys_run_in_executor(ft.partial(
git.Repo.clone_from, self.url, str(self.path),
**git_args
))
self.repo = await self.sys_run_in_executor(
ft.partial(
git.Repo.clone_from, self.url, str(self.path), **git_args
)
)
except (git.InvalidGitRepositoryError, git.NoSuchPathError,
git.GitCommandError) as err:
except (
git.InvalidGitRepositoryError,
git.NoSuchPathError,
git.GitCommandError,
) as err:
_LOGGER.error("Can't clone %s repository: %s.", self.url, err)
self._remove()
return False
@ -96,22 +103,26 @@ class GitRepo(CoreSysAttributes):
try:
# Download data
await self.sys_run_in_executor(ft.partial(
self.repo.remotes.origin.fetch, **{
'update-shallow': True,
'depth': 1,
}))
await self.sys_run_in_executor(
ft.partial(
self.repo.remotes.origin.fetch,
**{"update-shallow": True, "depth": 1},
)
)
# Jump on top of that
await self.sys_run_in_executor(ft.partial(
self.repo.git.reset, f"origin/{branch}", hard=True))
await self.sys_run_in_executor(
ft.partial(self.repo.git.reset, f"origin/{branch}", hard=True)
)
# Cleanup old data
await self.sys_run_in_executor(ft.partial(
self.repo.git.clean, "-xdf"))
await self.sys_run_in_executor(ft.partial(self.repo.git.clean, "-xdf"))
except (git.InvalidGitRepositoryError, git.NoSuchPathError,
git.GitCommandError) as err:
except (
git.InvalidGitRepositoryError,
git.NoSuchPathError,
git.GitCommandError,
) as err:
_LOGGER.error("Can't update %s repo: %s.", self.url, err)
return False
@ -134,8 +145,7 @@ class GitRepoHassIO(GitRepo):
def __init__(self, coresys):
"""Initialize Git Hass.io add-on repository."""
super().__init__(
coresys, coresys.config.path_addons_core, URL_HASSIO_ADDONS)
super().__init__(coresys, coresys.config.path_addons_core, URL_HASSIO_ADDONS)
class GitRepoCustom(GitRepo):
@ -143,9 +153,7 @@ class GitRepoCustom(GitRepo):
def __init__(self, coresys, url):
"""Initialize custom Git Hass.io addo-n repository."""
path = Path(
coresys.config.path_addons_git,
get_hash_from_repository(url))
path = Path(coresys.config.path_addons_git, get_hash_from_repository(url))
super().__init__(coresys, path, url)

View File

@ -2,11 +2,16 @@
from .git import GitRepoHassIO, GitRepoCustom
from .utils import get_hash_from_repository
from ..const import (
REPOSITORY_CORE, REPOSITORY_LOCAL, ATTR_NAME, ATTR_URL, ATTR_MAINTAINER)
REPOSITORY_CORE,
REPOSITORY_LOCAL,
ATTR_NAME,
ATTR_URL,
ATTR_MAINTAINER,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
UNKNOWN = 'unknown'
UNKNOWN = "unknown"
class Repository(CoreSysAttributes):

View File

@ -6,8 +6,11 @@ from ..const import ATTR_NAME, ATTR_URL, ATTR_MAINTAINER
# pylint: disable=no-value-for-parameter
SCHEMA_REPOSITORY_CONFIG = vol.Schema({
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Optional(ATTR_URL): vol.Url(),
vol.Optional(ATTR_MAINTAINER): vol.Coerce(str),
}, extra=vol.REMOVE_EXTRA)
SCHEMA_REPOSITORY_CONFIG = vol.Schema(
{
vol.Required(ATTR_NAME): vol.Coerce(str),
vol.Optional(ATTR_URL): vol.Url(),
vol.Optional(ATTR_MAINTAINER): vol.Coerce(str),
},
extra=vol.REMOVE_EXTRA,
)

View File

@ -8,8 +8,14 @@ import logging
import aiohttp
from .const import (
URL_HASSIO_VERSION, FILE_HASSIO_UPDATER, ATTR_HOMEASSISTANT, ATTR_HASSIO,
ATTR_CHANNEL, ATTR_HASSOS, ATTR_HASSOS_CLI)
URL_HASSIO_VERSION,
FILE_HASSIO_UPDATER,
ATTR_HOMEASSISTANT,
ATTR_HASSIO,
ATTR_CHANNEL,
ATTR_HASSOS,
ATTR_HASSOS_CLI,
)
from .coresys import CoreSysAttributes
from .utils import AsyncThrottle
from .utils.json import JsonConfig
@ -74,7 +80,7 @@ class Updater(JsonConfig, CoreSysAttributes):
Is a coroutine.
"""
url = URL_HASSIO_VERSION.format(channel=self.channel)
machine = self.sys_machine or 'default'
machine = self.sys_machine or "default"
board = self.sys_hassos.board
try:
@ -97,15 +103,15 @@ class Updater(JsonConfig, CoreSysAttributes):
try:
# update supervisor version
self._data[ATTR_HASSIO] = data['supervisor']
self._data[ATTR_HASSIO] = data["supervisor"]
# update Home Assistant version
self._data[ATTR_HOMEASSISTANT] = data['homeassistant'][machine]
self._data[ATTR_HOMEASSISTANT] = data["homeassistant"][machine]
# update hassos version
if self.sys_hassos.available and board:
self._data[ATTR_HASSOS] = data['hassos'][board]
self._data[ATTR_HASSOS_CLI] = data['hassos-cli']
self._data[ATTR_HASSOS] = data["hassos"][board]
self._data[ATTR_HASSOS_CLI] = data["hassos-cli"]
except KeyError as err:
_LOGGER.warning("Can't process version data: %s", err)

View File

@ -14,7 +14,7 @@ def get_profile_name(profile_file):
profiles = set()
try:
with profile_file.open('r') as profile_data:
with profile_file.open("r") as profile_data:
for line in profile_data:
match = RE_PROFILE.match(line)
if not match:
@ -45,21 +45,20 @@ def adjust_profile(profile_name, profile_file, profile_new):
# Process old data
try:
with profile_file.open('r') as profile:
with profile_file.open("r") as profile:
for line in profile:
match = RE_PROFILE.match(line)
if not match:
profile_data.append(line)
else:
profile_data.append(
line.replace(org_profile, profile_name))
profile_data.append(line.replace(org_profile, profile_name))
except OSError as err:
_LOGGER.error("Can't adjust origin profile: %s", err)
raise AppArmorFileError()
# Write into new file
try:
with profile_new.open('w') as profile:
with profile_new.open("w") as profile:
profile.writelines(profile_data)
except OSError as err:
_LOGGER.error("Can't write new profile: %s", err)

View File

@ -14,9 +14,11 @@ _LOGGER = logging.getLogger(__name__)
# Use to convert GVariant into json
RE_GVARIANT_TYPE = re.compile(
r"(?:boolean|byte|int16|uint16|int32|uint32|handle|int64|uint64|double|"
r"string|objectpath|signature) ")
r"string|objectpath|signature) "
)
RE_GVARIANT_VARIANT = re.compile(
r"(?<=(?: |{|\[))<((?:'|\").*?(?:'|\")|\d+(?:\.\d+)?)>(?=(?:|]|}|,))")
r"(?<=(?: |{|\[))<((?:'|\").*?(?:'|\")|\d+(?:\.\d+)?)>(?=(?:|]|}|,))"
)
RE_GVARIANT_STRING = re.compile(r"(?<=(?: |{|\[|\())'(.*?)'(?=(?:|]|}|,|\)))")
RE_GVARIANT_TUPLE_O = re.compile(r"\"[^\"]*?\"|(\()")
RE_GVARIANT_TUPLE_C = re.compile(r"\"[^\"]*?\"|(,?\))")
@ -24,13 +26,14 @@ RE_GVARIANT_TUPLE_C = re.compile(r"\"[^\"]*?\"|(,?\))")
RE_MONITOR_OUTPUT = re.compile(r".+?: (?P<signal>[^ ].+) (?P<data>.*)")
# Commands for dbus
INTROSPECT = ("gdbus introspect --system --dest {bus} "
"--object-path {object} --xml")
CALL = ("gdbus call --system --dest {bus} --object-path {object} "
"--method {method} {args}")
MONITOR = ("gdbus monitor --system --dest {bus}")
INTROSPECT = "gdbus introspect --system --dest {bus} " "--object-path {object} --xml"
CALL = (
"gdbus call --system --dest {bus} --object-path {object} "
"--method {method} {args}"
)
MONITOR = "gdbus monitor --system --dest {bus}"
DBUS_METHOD_GETALL = 'org.freedesktop.DBus.Properties.GetAll'
DBUS_METHOD_GETALL = "org.freedesktop.DBus.Properties.GetAll"
class DBus:
@ -54,10 +57,9 @@ class DBus:
async def _init_proxy(self):
"""Read interface data."""
command = shlex.split(INTROSPECT.format(
bus=self.bus_name,
object=self.object_path
))
command = shlex.split(
INTROSPECT.format(bus=self.bus_name, object=self.object_path)
)
# Ask data
_LOGGER.info("Introspect %s on %s", self.bus_name, self.object_path)
@ -73,16 +75,16 @@ class DBus:
# Read available methods
_LOGGER.debug("data: %s", data)
for interface in xml.findall("./interface"):
interface_name = interface.get('name')
interface_name = interface.get("name")
# Methods
for method in interface.findall("./method"):
method_name = method.get('name')
method_name = method.get("name")
self.methods.add(f"{interface_name}.{method_name}")
# Signals
for signal in interface.findall("./signal"):
signal_name = signal.get('name')
signal_name = signal.get("name")
self.signals.add(f"{interface_name}.{signal_name}")
@staticmethod
@ -92,9 +94,11 @@ class DBus:
raw = RE_GVARIANT_VARIANT.sub(r"\1", raw)
raw = RE_GVARIANT_STRING.sub(r'"\1"', raw)
raw = RE_GVARIANT_TUPLE_O.sub(
lambda x: x.group(0) if not x.group(1) else"[", raw)
lambda x: x.group(0) if not x.group(1) else "[", raw
)
raw = RE_GVARIANT_TUPLE_C.sub(
lambda x: x.group(0) if not x.group(1) else"]", raw)
lambda x: x.group(0) if not x.group(1) else "]", raw
)
# No data
if raw.startswith("[]"):
@ -116,7 +120,7 @@ class DBus:
elif isinstance(arg, (int, float)):
gvariant += f" {arg}"
elif isinstance(arg, str):
gvariant += f" \"{arg}\""
gvariant += f' "{arg}"'
else:
gvariant += " {}".format(str(arg))
@ -124,12 +128,14 @@ class DBus:
async def call_dbus(self, method, *args):
"""Call a dbus method."""
command = shlex.split(CALL.format(
bus=self.bus_name,
object=self.object_path,
method=method,
args=self.gvariant_args(args)
))
command = shlex.split(
CALL.format(
bus=self.bus_name,
object=self.object_path,
method=method,
args=self.gvariant_args(args),
)
)
# Run command
_LOGGER.info("Call %s on %s", method, self.object_path)
@ -155,7 +161,7 @@ class DBus:
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
data, error = await proc.communicate()
@ -229,14 +235,12 @@ class DBusSignalWrapper:
async def __aenter__(self):
"""Start monitor events."""
_LOGGER.info("Start dbus monitor on %s", self.dbus.bus_name)
command = shlex.split(MONITOR.format(
bus=self.dbus.bus_name
))
command = shlex.split(MONITOR.format(bus=self.dbus.bus_name))
self._proc = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
stderr=asyncio.subprocess.PIPE,
)
return self
@ -271,8 +275,8 @@ class DBusSignalWrapper:
match = RE_MONITOR_OUTPUT.match(data.decode())
if not match:
continue
signal = match.group('signal')
data = match.group('data')
signal = match.group("signal")
data = match.group("data")
# Filter signals?
if self._signals and signal not in self._signals: