Collection of type hinting fixes (#1747)

This commit is contained in:
Franck Nijhof 2020-05-23 13:19:53 +02:00 committed by GitHub
parent 383657e8ce
commit 49de5be44e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 87 additions and 76 deletions

View File

@ -37,7 +37,7 @@ class AddonManager(CoreSysAttributes):
@property
def all(self) -> List[AnyAddon]:
"""Return a list of all add-ons."""
addons = {**self.store, **self.local}
addons: Dict[str, AnyAddon] = {**self.store, **self.local}
return list(addons.values())
@property
@ -142,7 +142,7 @@ class AddonManager(CoreSysAttributes):
if slug not in self.local:
_LOGGER.warning("Add-on %s is not installed", slug)
return
addon = self.local.get(slug)
addon = self.local[slug]
try:
await addon.instance.remove()
@ -191,12 +191,12 @@ class AddonManager(CoreSysAttributes):
if slug not in self.local:
_LOGGER.error("Add-on %s is not installed", slug)
raise AddonsError()
addon = self.local.get(slug)
addon = self.local[slug]
if addon.is_detached:
_LOGGER.error("Add-on %s is not available inside store", slug)
raise AddonsError()
store = self.store.get(slug)
store = self.store[slug]
if addon.version == store.version:
_LOGGER.warning("No update available for add-on %s", slug)
@ -233,12 +233,12 @@ class AddonManager(CoreSysAttributes):
if slug not in self.local:
_LOGGER.error("Add-on %s is not installed", slug)
raise AddonsError()
addon = self.local.get(slug)
addon = self.local[slug]
if addon.is_detached:
_LOGGER.error("Add-on %s is not available inside store", slug)
raise AddonsError()
store = self.store.get(slug)
store = self.store[slug]
# Check if a rebuild is possible now
if addon.version != store.version:

View File

@ -215,7 +215,8 @@ class AddonModel(CoreSysAttributes):
services = {}
for data in services_list:
service = RE_SERVICE.match(data)
services[service.group("service")] = service.group("rights")
if service:
services[service.group("service")] = service.group("rights")
return services
@ -464,6 +465,8 @@ class AddonModel(CoreSysAttributes):
volumes = {}
for volume in self.data[ATTR_MAP]:
result = RE_VOLUME.match(volume)
if not result:
continue
volumes[result.group(1)] = result.group(2) or "ro"
return volumes

View File

@ -2,7 +2,7 @@
import logging
import re
import secrets
from typing import Any, Dict, List
from typing import Any, Dict, List, Union
import uuid
import voluptuous as vol
@ -385,6 +385,9 @@ def _single_validate(coresys: CoreSys, typ: str, value: Any, key: str):
# parse extend data from type
match = RE_SCHEMA_ELEMENT.match(typ)
if not match:
raise vol.Invalid(f"Unknown type {typ}")
# prepare range
range_args = {}
for group_name in _SCHEMA_LENGTH_PARTS:
@ -462,7 +465,7 @@ def _check_missing_options(origin, exists, root):
def schema_ui_options(raw_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate UI schema."""
ui_schema = []
ui_schema: List[Dict[str, Any]] = []
# read options
for key, value in raw_schema.items():
@ -483,7 +486,7 @@ def _single_ui_option(
ui_schema: List[Dict[str, Any]], value: str, key: str, multiple: bool = False
) -> None:
"""Validate a single element."""
ui_node = {"name": key}
ui_node: Dict[str, Union[str, bool, float, List[str]]] = {"name": key}
# If multiple
if multiple:
@ -491,6 +494,8 @@ def _single_ui_option(
# Parse extend data from type
match = RE_SCHEMA_ELEMENT.match(value)
if not match:
return
# Prepare range
for group_name in _SCHEMA_LENGTH_PARTS:

View File

@ -33,7 +33,7 @@ from ..const import (
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..validate import docker_image, network_port, complex_version
from ..validate import complex_version, docker_image, network_port
from .utils import api_process, api_process_raw, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)

View File

@ -43,7 +43,7 @@ from ..const import (
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..utils.validate import validate_timezone
from ..validate import repositories, wait_boot, simple_version
from ..validate import repositories, simple_version, wait_boot
from .utils import api_process, api_process_raw, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)

View File

@ -45,7 +45,7 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
MACHINE_ID = Path("/etc/machine-id")
async def initialize_coresys():
async def initialize_coresys() -> None:
"""Initialize supervisor coresys/objects."""
coresys = CoreSys()
@ -92,7 +92,7 @@ async def initialize_coresys():
return coresys
def initialize_system_data(coresys: CoreSys):
def initialize_system_data(coresys: CoreSys) -> None:
"""Set up the default configuration and create folders."""
config = coresys.config
@ -168,7 +168,7 @@ def initialize_system_data(coresys: CoreSys):
coresys.config.debug = True
def migrate_system_env(coresys: CoreSys):
def migrate_system_env(coresys: CoreSys) -> None:
"""Cleanup some stuff after update."""
config = coresys.config
@ -181,7 +181,7 @@ def migrate_system_env(coresys: CoreSys):
_LOGGER.warning("Can't cleanup old Add-on build directory")
def initialize_logging():
def initialize_logging() -> None:
"""Initialize the logging."""
logging.basicConfig(level=logging.INFO)
fmt = "%(asctime)s %(levelname)s (%(threadName)s) [%(name)s] %(message)s"
@ -237,7 +237,7 @@ def check_environment() -> None:
_LOGGER.critical("Can't find gdbus!")
def reg_signal(loop):
def reg_signal(loop) -> None:
"""Register SIGTERM and SIGKILL to stop system."""
try:
loop.add_signal_handler(signal.SIGTERM, lambda: loop.call_soon(loop.stop))

View File

@ -3,7 +3,7 @@ from datetime import datetime
import logging
import os
from pathlib import Path, PurePath
from typing import Optional
from typing import List, Optional
from .const import (
ATTR_ADDONS_CUSTOM_LIST,
@ -52,12 +52,12 @@ class CoreConfig(JsonConfig):
super().__init__(FILE_HASSIO_CONFIG, SCHEMA_SUPERVISOR_CONFIG)
@property
def timezone(self):
def timezone(self) -> str:
"""Return system timezone."""
return self._data[ATTR_TIMEZONE]
@timezone.setter
def timezone(self, value):
def timezone(self, value: str) -> None:
"""Set system timezone."""
self._data[ATTR_TIMEZONE] = value
@ -67,7 +67,7 @@ class CoreConfig(JsonConfig):
return self._data.get(ATTR_VERSION)
@version.setter
def version(self, value: str):
def version(self, value: str) -> None:
"""Set config version."""
self._data[ATTR_VERSION] = value
@ -77,7 +77,7 @@ class CoreConfig(JsonConfig):
return self._data[ATTR_WAIT_BOOT]
@wait_boot.setter
def wait_boot(self, value: int):
def wait_boot(self, value: int) -> None:
"""Set wait boot time."""
self._data[ATTR_WAIT_BOOT] = value
@ -87,7 +87,7 @@ class CoreConfig(JsonConfig):
return self._data[ATTR_DEBUG]
@debug.setter
def debug(self, value: bool):
def debug(self, value: bool) -> None:
"""Set debug mode."""
self._data[ATTR_DEBUG] = value
@ -97,7 +97,7 @@ class CoreConfig(JsonConfig):
return self._data[ATTR_DEBUG_BLOCK]
@debug_block.setter
def debug_block(self, value: bool):
def debug_block(self, value: bool) -> None:
"""Set debug wait mode."""
self._data[ATTR_DEBUG_BLOCK] = value
@ -107,7 +107,7 @@ class CoreConfig(JsonConfig):
return self._data[ATTR_LOGGING]
@logging.setter
def logging(self, value: LogLevel):
def logging(self, value: LogLevel) -> None:
"""Set system log level."""
self._data[ATTR_LOGGING] = value
self.modify_log_level()
@ -118,7 +118,7 @@ class CoreConfig(JsonConfig):
logging.getLogger("supervisor").setLevel(lvl)
@property
def last_boot(self):
def last_boot(self) -> datetime:
"""Return last boot datetime."""
boot_str = self._data.get(ATTR_LAST_BOOT, DEFAULT_BOOT_TIME)
@ -128,138 +128,138 @@ class CoreConfig(JsonConfig):
return boot_time
@last_boot.setter
def last_boot(self, value):
def last_boot(self, value: datetime) -> None:
"""Set last boot datetime."""
self._data[ATTR_LAST_BOOT] = value.isoformat()
@property
def path_supervisor(self):
def path_supervisor(self) -> Path:
"""Return Supervisor data path."""
return SUPERVISOR_DATA
@property
def path_extern_supervisor(self):
def path_extern_supervisor(self) -> PurePath:
"""Return Supervisor data path external for Docker."""
return PurePath(os.environ[ENV_SUPERVISOR_SHARE])
@property
def path_extern_homeassistant(self):
def path_extern_homeassistant(self) -> str:
"""Return config path external for Docker."""
return str(PurePath(self.path_extern_supervisor, HOMEASSISTANT_CONFIG))
@property
def path_homeassistant(self):
def path_homeassistant(self) -> Path:
"""Return config path inside supervisor."""
return Path(SUPERVISOR_DATA, HOMEASSISTANT_CONFIG)
@property
def path_extern_ssl(self):
def path_extern_ssl(self) -> str:
"""Return SSL path external for Docker."""
return str(PurePath(self.path_extern_supervisor, HASSIO_SSL))
@property
def path_ssl(self):
def path_ssl(self) -> Path:
"""Return SSL path inside supervisor."""
return Path(SUPERVISOR_DATA, HASSIO_SSL)
@property
def path_addons_core(self):
def path_addons_core(self) -> Path:
"""Return git path for core Add-ons."""
return Path(SUPERVISOR_DATA, ADDONS_CORE)
@property
def path_addons_git(self):
def path_addons_git(self) -> Path:
"""Return path for Git Add-on."""
return Path(SUPERVISOR_DATA, ADDONS_GIT)
@property
def path_addons_local(self):
def path_addons_local(self) -> Path:
"""Return path for custom Add-ons."""
return Path(SUPERVISOR_DATA, ADDONS_LOCAL)
@property
def path_extern_addons_local(self):
def path_extern_addons_local(self) -> PurePath:
"""Return path for custom Add-ons."""
return PurePath(self.path_extern_supervisor, ADDONS_LOCAL)
@property
def path_addons_data(self):
def path_addons_data(self) -> Path:
"""Return root Add-on data folder."""
return Path(SUPERVISOR_DATA, ADDONS_DATA)
@property
def path_extern_addons_data(self):
def path_extern_addons_data(self) -> PurePath:
"""Return root add-on data folder external for Docker."""
return PurePath(self.path_extern_supervisor, ADDONS_DATA)
@property
def path_audio(self):
def path_audio(self) -> Path:
"""Return root audio data folder."""
return Path(SUPERVISOR_DATA, AUDIO_DATA)
@property
def path_extern_audio(self):
def path_extern_audio(self) -> PurePath:
"""Return root audio data folder external for Docker."""
return PurePath(self.path_extern_supervisor, AUDIO_DATA)
@property
def path_tmp(self):
def path_tmp(self) -> Path:
"""Return Supervisor temp folder."""
return Path(SUPERVISOR_DATA, TMP_DATA)
@property
def path_extern_tmp(self):
def path_extern_tmp(self) -> PurePath:
"""Return Supervisor temp folder for Docker."""
return PurePath(self.path_extern_supervisor, TMP_DATA)
@property
def path_backup(self):
def path_backup(self) -> Path:
"""Return root backup data folder."""
return Path(SUPERVISOR_DATA, BACKUP_DATA)
@property
def path_extern_backup(self):
def path_extern_backup(self) -> PurePath:
"""Return root backup data folder external for Docker."""
return PurePath(self.path_extern_supervisor, BACKUP_DATA)
@property
def path_share(self):
def path_share(self) -> Path:
"""Return root share data folder."""
return Path(SUPERVISOR_DATA, SHARE_DATA)
@property
def path_apparmor(self):
def path_apparmor(self) -> Path:
"""Return root Apparmor profile folder."""
return Path(SUPERVISOR_DATA, APPARMOR_DATA)
@property
def path_extern_share(self):
def path_extern_share(self) -> PurePath:
"""Return root share data folder external for Docker."""
return PurePath(self.path_extern_supervisor, SHARE_DATA)
@property
def path_extern_dns(self):
def path_extern_dns(self) -> str:
"""Return dns path external for Docker."""
return str(PurePath(self.path_extern_supervisor, DNS_DATA))
@property
def path_dns(self):
def path_dns(self) -> Path:
"""Return dns path inside supervisor."""
return Path(SUPERVISOR_DATA, DNS_DATA)
@property
def addons_repositories(self):
def addons_repositories(self) -> List[str]:
"""Return list of custom Add-on repositories."""
return self._data[ATTR_ADDONS_CUSTOM_LIST]
def add_addon_repository(self, repo):
def add_addon_repository(self, repo: str) -> None:
"""Add a custom repository to list."""
if repo in self._data[ATTR_ADDONS_CUSTOM_LIST]:
return
self._data[ATTR_ADDONS_CUSTOM_LIST].append(repo)
def drop_addon_repository(self, repo):
def drop_addon_repository(self, repo: str) -> None:
"""Remove a custom repository from list."""
if repo not in self._data[ATTR_ADDONS_CUSTOM_LIST]:
return

View File

@ -95,7 +95,7 @@ class DockerAPI:
version: str = "latest",
dns: bool = True,
ipv4: Optional[IPv4Address] = None,
**kwargs: Dict[str, Any],
**kwargs: Any,
) -> docker.models.containers.Container:
"""Create a Docker container and run it.
@ -153,7 +153,7 @@ class DockerAPI:
image: str,
version: str = "latest",
command: Optional[str] = None,
**kwargs: Dict[str, Any],
**kwargs: Any,
) -> CommandReturn:
"""Create a temporary container and run command.

View File

@ -26,7 +26,7 @@ class DockerInterface(CoreSysAttributes):
self.lock: asyncio.Lock = asyncio.Lock()
@property
def timeout(self) -> str:
def timeout(self) -> int:
"""Return timeout for Docker actions."""
return 30

View File

@ -91,10 +91,10 @@ class DockerNetwork:
Need run inside executor.
"""
ipv4 = str(ipv4) if ipv4 else None
ipv4_address = str(ipv4) if ipv4 else None
try:
self.network.connect(container, aliases=alias, ipv4_address=ipv4)
self.network.connect(container, aliases=alias, ipv4_address=ipv4_address)
except docker.errors.APIError as err:
_LOGGER.error("Can't link container to hassio-net: %s", err)
raise DockerAPIError() from None

View File

@ -56,7 +56,7 @@ class HassOS(CoreSysAttributes):
_LOGGER.error("No HassOS available")
raise HassOSNotSupportedError()
async def _download_raucb(self, version: str) -> None:
async def _download_raucb(self, version: str) -> Path:
"""Download rauc bundle (OTA) from github."""
url = URL_HASSOS_OTA.format(version=version, board=self.board)
raucb = Path(self.sys_config.path_tmp, f"hassos-{version}.raucb")
@ -158,7 +158,7 @@ class HassOS(CoreSysAttributes):
_LOGGER.error("HassOS update fails with: %s", self.sys_dbus.rauc.last_error)
raise HassOSUpdateError()
async def mark_healthy(self):
async def mark_healthy(self) -> None:
"""Set booted partition as good for rauc."""
try:
response = await self.sys_dbus.rauc.mark(RaucState.GOOD, "booted")

View File

@ -5,7 +5,7 @@ Code: https://github.com/home-assistant/plugin-audio
import asyncio
from contextlib import suppress
import logging
from pathlib import Path
from pathlib import Path, PurePath
import shutil
from typing import Awaitable, Optional
@ -36,12 +36,12 @@ class Audio(JsonConfig, CoreSysAttributes):
self.client_template: Optional[jinja2.Template] = None
@property
def path_extern_pulse(self) -> Path:
def path_extern_pulse(self) -> PurePath:
"""Return path of pulse socket file."""
return self.sys_config.path_extern_audio.joinpath("external")
@property
def path_extern_asound(self) -> Path:
def path_extern_asound(self) -> PurePath:
"""Return path of default asound config file."""
return self.sys_config.path_extern_audio.joinpath("asound")

View File

@ -390,6 +390,7 @@ class CoreDNS(JsonConfig, CoreSysAttributes):
if name not in entry.names:
continue
return entry
return None
def logs(self) -> Awaitable[bytes]:
"""Get CoreDNS docker logs.

View File

@ -3,7 +3,7 @@
import voluptuous as vol
from ..const import ATTR_ACCESS_TOKEN, ATTR_IMAGE, ATTR_SERVERS, ATTR_VERSION
from ..validate import dns_server_list, docker_image, token, simple_version
from ..validate import dns_server_list, docker_image, simple_version, token
SCHEMA_DNS_CONFIG = vol.Schema(
{

View File

@ -2,7 +2,7 @@
from datetime import timedelta
import logging
from pathlib import Path
from typing import Dict, Union
from typing import Dict, Optional, Union
from ruamel.yaml import YAML, YAMLError
@ -25,7 +25,7 @@ class SecretsManager(CoreSysAttributes):
"""Return path to secret file."""
return Path(self.sys_config.path_homeassistant, "secrets.yaml")
def get(self, secret: str) -> Union[bool, float, int, str]:
def get(self, secret: str) -> Optional[Union[bool, float, int, str]]:
"""Get secret from store."""
_LOGGER.info("Request secret %s", secret)
return self.secrets.get(secret)

View File

@ -31,7 +31,7 @@ from ..const import (
SNAPSHOT_FULL,
SNAPSHOT_PARTIAL,
)
from ..validate import docker_image, network_port, repositories, complex_version
from ..validate import complex_version, docker_image, network_port, repositories
ALL_FOLDERS = [FOLDER_HOMEASSISTANT, FOLDER_SHARE, FOLDER_ADDONS, FOLDER_SSL]

View File

@ -20,21 +20,23 @@ from ..exceptions import (
_LOGGER: logging.Logger = logging.getLogger(__name__)
# Use to convert GVariant into json
RE_GVARIANT_TYPE: re.Match = re.compile(
RE_GVARIANT_TYPE: re.Pattern[Any] = re.compile(
r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(boolean|byte|int16|uint16|int32|uint32|handle|int64|uint64|double|"
r"string|objectpath|signature|@[asviumodf\{\}]+) "
)
RE_GVARIANT_VARIANT: re.Match = re.compile(r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(<|>)")
RE_GVARIANT_STRING_ESC: re.Match = re.compile(
RE_GVARIANT_VARIANT: re.Pattern[Any] = re.compile(r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(<|>)")
RE_GVARIANT_STRING_ESC: re.Pattern[Any] = re.compile(
r"(?<=(?: |{|\[|\(|<))'[^']*?\"[^']*?'(?=(?:|]|}|,|\)|>))"
)
RE_GVARIANT_STRING: re.Match = re.compile(
RE_GVARIANT_STRING: re.Pattern[Any] = re.compile(
r"(?<=(?: |{|\[|\(|<))'(.*?)'(?=(?:|]|}|,|\)|>))"
)
RE_GVARIANT_TUPLE_O: re.Match = re.compile(r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(\()")
RE_GVARIANT_TUPLE_C: re.Match = re.compile(r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(,?\))")
RE_GVARIANT_TUPLE_O: re.Pattern[Any] = re.compile(r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(\()")
RE_GVARIANT_TUPLE_C: re.Pattern[Any] = re.compile(
r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(,?\))"
)
RE_MONITOR_OUTPUT: re.Match = re.compile(r".+?: (?P<signal>[^ ].+) (?P<data>.*)")
RE_MONITOR_OUTPUT: re.Pattern[Any] = re.compile(r".+?: (?P<signal>[^ ].+) (?P<data>.*)")
# Map GDBus to errors
MAP_GDBUS_ERROR: Dict[str, Any] = {

View File

@ -1,11 +1,11 @@
"""Validate functions."""
import ipaddress
import re
import uuid
from typing import Optional, Union
import uuid
import voluptuous as vol
from packaging import version as pkg_version
import voluptuous as vol
from .const import (
ATTR_ACCESS_TOKEN,