Bump pyupgrade from 2.26.0 to 2.26.0.post1 (#3131)

* Bump pyupgrade from 2.26.0 to 2.26.0.post1

Bumps [pyupgrade](https://github.com/asottile/pyupgrade) from 2.26.0 to 2.26.0.post1.
- [Release notes](https://github.com/asottile/pyupgrade/releases)
- [Commits](https://github.com/asottile/pyupgrade/commits)

---
updated-dependencies:
- dependency-name: pyupgrade
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update .pre-commit-config.yaml

* Fixes

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch>
This commit is contained in:
dependabot[bot] 2021-09-21 18:22:56 +02:00 committed by GitHub
parent 04f36e92e1
commit dafc2cfec2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
120 changed files with 491 additions and 524 deletions

View File

@ -28,7 +28,7 @@ repos:
hooks:
- id: isort
- repo: https://github.com/asottile/pyupgrade
rev: v2.6.2
rev: v2.26.0
hooks:
- id: pyupgrade
args: [--py37-plus]
args: [--py39-plus]

View File

@ -11,4 +11,4 @@ pytest-asyncio==0.12.0 # NB!: Versions over 0.12.0 breaks pytest-aiohttp (https:
pytest-cov==2.12.1
pytest-timeout==1.4.2
pytest==6.2.5
pyupgrade==2.26.0
pyupgrade==2.26.0.post1

View File

@ -3,7 +3,7 @@ import asyncio
from contextlib import suppress
import logging
import tarfile
from typing import Dict, List, Optional, Union
from typing import Optional, Union
from ..const import AddonBoot, AddonStartup, AddonState
from ..coresys import CoreSys, CoreSysAttributes
@ -38,17 +38,17 @@ class AddonManager(CoreSysAttributes):
"""Initialize Docker base wrapper."""
self.coresys: CoreSys = coresys
self.data: AddonsData = AddonsData(coresys)
self.local: Dict[str, Addon] = {}
self.store: Dict[str, AddonStore] = {}
self.local: dict[str, Addon] = {}
self.store: dict[str, AddonStore] = {}
@property
def all(self) -> List[AnyAddon]:
def all(self) -> list[AnyAddon]:
"""Return a list of all add-ons."""
addons: Dict[str, AnyAddon] = {**self.store, **self.local}
addons: dict[str, AnyAddon] = {**self.store, **self.local}
return list(addons.values())
@property
def installed(self) -> List[Addon]:
def installed(self) -> list[Addon]:
"""Return a list of all installed add-ons."""
return list(self.local.values())
@ -89,7 +89,7 @@ class AddonManager(CoreSysAttributes):
async def boot(self, stage: AddonStartup) -> None:
"""Boot add-ons with mode auto."""
tasks: List[Addon] = []
tasks: list[Addon] = []
for addon in self.installed:
if addon.boot != AddonBoot.AUTO or addon.startup != stage:
continue
@ -123,7 +123,7 @@ class AddonManager(CoreSysAttributes):
async def shutdown(self, stage: AddonStartup) -> None:
"""Shutdown addons."""
tasks: List[Addon] = []
tasks: list[Addon] = []
for addon in self.installed:
if addon.state != AddonState.STARTED or addon.startup != stage:
continue
@ -371,7 +371,7 @@ class AddonManager(CoreSysAttributes):
@Job(conditions=[JobCondition.FREE_SPACE, JobCondition.INTERNET_HOST])
async def repair(self) -> None:
"""Repair local add-ons."""
needs_repair: List[Addon] = []
needs_repair: list[Addon] = []
# Evaluate Add-ons to repair
for addon in self.installed:

View File

@ -10,7 +10,7 @@ import secrets
import shutil
import tarfile
from tempfile import TemporaryDirectory
from typing import Any, Awaitable, Dict, List, Optional, Set
from typing import Any, Awaitable, Optional
import aiohttp
import voluptuous as vol
@ -187,17 +187,17 @@ class Addon(AddonModel):
return self.version != self.latest_version
@property
def dns(self) -> List[str]:
def dns(self) -> list[str]:
"""Return list of DNS name for that add-on."""
return [f"{self.hostname}.{DNS_SUFFIX}"]
@property
def options(self) -> Dict[str, Any]:
def options(self) -> dict[str, Any]:
"""Return options with local changes."""
return {**self.data[ATTR_OPTIONS], **self.persist[ATTR_OPTIONS]}
@options.setter
def options(self, value: Optional[Dict[str, Any]]) -> None:
def options(self, value: Optional[dict[str, Any]]) -> None:
"""Store user add-on options."""
self.persist[ATTR_OPTIONS] = {} if value is None else deepcopy(value)
@ -274,12 +274,12 @@ class Addon(AddonModel):
self.persist[ATTR_PROTECTED] = value
@property
def ports(self) -> Optional[Dict[str, Optional[int]]]:
def ports(self) -> Optional[dict[str, Optional[int]]]:
"""Return ports of add-on."""
return self.persist.get(ATTR_NETWORK, super().ports)
@ports.setter
def ports(self, value: Optional[Dict[str, Optional[int]]]) -> None:
def ports(self, value: Optional[dict[str, Optional[int]]]) -> None:
"""Set custom ports of add-on."""
if value is None:
self.persist.pop(ATTR_NETWORK, None)
@ -425,7 +425,7 @@ class Addon(AddonModel):
return Path(self.sys_config.path_extern_tmp, f"{self.slug}_pulse")
@property
def devices(self) -> Set[Device]:
def devices(self) -> set[Device]:
"""Extract devices from add-on options."""
options_schema = self.schema
with suppress(vol.Invalid):
@ -434,7 +434,7 @@ class Addon(AddonModel):
return options_schema.devices
@property
def pwned(self) -> Set[str]:
def pwned(self) -> set[str]:
"""Extract pwned data for add-on options."""
options_schema = self.schema
with suppress(vol.Invalid):

View File

@ -2,7 +2,7 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Dict
from typing import TYPE_CHECKING
from awesomeversion import AwesomeVersion
@ -60,12 +60,12 @@ class AddonBuild(FileConfiguration, CoreSysAttributes):
return self._data[ATTR_SQUASH]
@property
def additional_args(self) -> Dict[str, str]:
def additional_args(self) -> dict[str, str]:
"""Return additional Docker build arguments."""
return self._data[ATTR_ARGS]
@property
def additional_labels(self) -> Dict[str, str]:
def additional_labels(self) -> dict[str, str]:
"""Return additional Docker labels."""
return self._data[ATTR_LABELS]

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor add-on data."""
from copy import deepcopy
from typing import Any, Dict
from typing import Any
from ..const import (
ATTR_IMAGE,
@ -16,7 +16,7 @@ from ..utils.common import FileConfiguration
from .addon import Addon
from .validate import SCHEMA_ADDONS_FILE
Config = Dict[str, Any]
Config = dict[str, Any]
class AddonsData(FileConfiguration, CoreSysAttributes):

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor add-ons."""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Awaitable, Dict, List, Optional
from typing import Any, Awaitable, Optional
from awesomeversion import AwesomeVersion, AwesomeVersionException
@ -83,7 +83,7 @@ from .const import ATTR_BACKUP
from .options import AddonOptions, UiOptions
from .validate import RE_SERVICE, RE_VOLUME
Data = Dict[str, Any]
Data = dict[str, Any]
class AddonModel(CoreSysAttributes, ABC):
@ -115,7 +115,7 @@ class AddonModel(CoreSysAttributes, ABC):
return self._available(self.data)
@property
def options(self) -> Dict[str, Any]:
def options(self) -> dict[str, Any]:
"""Return options with local changes."""
return self.data[ATTR_OPTIONS]
@ -140,7 +140,7 @@ class AddonModel(CoreSysAttributes, ABC):
return self.slug.replace("_", "-")
@property
def dns(self) -> List[str]:
def dns(self) -> list[str]:
"""Return list of DNS name for that add-on."""
return []
@ -228,7 +228,7 @@ class AddonModel(CoreSysAttributes, ABC):
return self.data[ATTR_STAGE]
@property
def services_role(self) -> Dict[str, str]:
def services_role(self) -> dict[str, str]:
"""Return dict of services with rights."""
services_list = self.data.get(ATTR_SERVICES, [])
@ -241,17 +241,17 @@ class AddonModel(CoreSysAttributes, ABC):
return services
@property
def discovery(self) -> List[str]:
def discovery(self) -> list[str]:
"""Return list of discoverable components/platforms."""
return self.data.get(ATTR_DISCOVERY, [])
@property
def ports_description(self) -> Optional[Dict[str, str]]:
def ports_description(self) -> Optional[dict[str, str]]:
"""Return descriptions of ports."""
return self.data.get(ATTR_PORTS_DESCRIPTION)
@property
def ports(self) -> Optional[Dict[str, Optional[int]]]:
def ports(self) -> Optional[dict[str, Optional[int]]]:
"""Return ports of add-on."""
return self.data.get(ATTR_PORTS)
@ -311,17 +311,17 @@ class AddonModel(CoreSysAttributes, ABC):
return self.data[ATTR_HOST_DBUS]
@property
def static_devices(self) -> List[Path]:
def static_devices(self) -> list[Path]:
"""Return static devices of add-on."""
return [Path(node) for node in self.data.get(ATTR_DEVICES, [])]
@property
def environment(self) -> Optional[Dict[str, str]]:
def environment(self) -> Optional[dict[str, str]]:
"""Return environment of add-on."""
return self.data.get(ATTR_ENVIRONMENT)
@property
def privileged(self) -> List[Capabilities]:
def privileged(self) -> list[Capabilities]:
"""Return list of privilege."""
return self.data.get(ATTR_PRIVILEGED, [])
@ -360,7 +360,7 @@ class AddonModel(CoreSysAttributes, ABC):
return self.data[ATTR_HASSIO_ROLE]
@property
def backup_exclude(self) -> List[str]:
def backup_exclude(self) -> list[str]:
"""Return Exclude list for backup."""
return self.data.get(ATTR_BACKUP_EXCLUDE, [])
@ -495,12 +495,12 @@ class AddonModel(CoreSysAttributes, ABC):
return self.path_documentation.exists()
@property
def supported_arch(self) -> List[str]:
def supported_arch(self) -> list[str]:
"""Return list of supported arch."""
return self.data[ATTR_ARCH]
@property
def supported_machine(self) -> List[str]:
def supported_machine(self) -> list[str]:
"""Return list of supported machine."""
return self.data.get(ATTR_MACHINE, [])
@ -515,7 +515,7 @@ class AddonModel(CoreSysAttributes, ABC):
return ATTR_IMAGE not in self.data
@property
def map_volumes(self) -> Dict[str, str]:
def map_volumes(self) -> dict[str, str]:
"""Return a dict of {volume: policy} from add-on."""
volumes = {}
for volume in self.data[ATTR_MAP]:
@ -566,7 +566,7 @@ class AddonModel(CoreSysAttributes, ABC):
return AddonOptions(self.coresys, raw_schema, self.name, self.slug)
@property
def schema_ui(self) -> Optional[List[Dict[any, any]]]:
def schema_ui(self) -> Optional[list[dict[any, any]]]:
"""Create a UI schema for add-on options."""
raw_schema = self.data[ATTR_SCHEMA]

View File

@ -3,7 +3,7 @@ import hashlib
import logging
from pathlib import Path
import re
from typing import Any, Dict, List, Set, Union
from typing import Any, Union
import voluptuous as vol
@ -59,13 +59,13 @@ class AddonOptions(CoreSysAttributes):
"""Validate Add-ons Options."""
def __init__(
self, coresys: CoreSys, raw_schema: Dict[str, Any], name: str, slug: str
self, coresys: CoreSys, raw_schema: dict[str, Any], name: str, slug: str
):
"""Validate schema."""
self.coresys: CoreSys = coresys
self.raw_schema: Dict[str, Any] = raw_schema
self.devices: Set[Device] = set()
self.pwned: Set[str] = set()
self.raw_schema: dict[str, Any] = raw_schema
self.devices: set[Device] = set()
self.pwned: set[str] = set()
self._name = name
self._slug = slug
@ -187,7 +187,7 @@ class AddonOptions(CoreSysAttributes):
f"Fatal error for option '{key}' with type '{typ}' in {self._name} ({self._slug})"
) from None
def _nested_validate_list(self, typ: Any, data_list: List[Any], key: str):
def _nested_validate_list(self, typ: Any, data_list: list[Any], key: str):
"""Validate nested items."""
options = []
@ -209,7 +209,7 @@ class AddonOptions(CoreSysAttributes):
return options
def _nested_validate_dict(
self, typ: Dict[Any, Any], data_dict: Dict[Any, Any], key: str
self, typ: dict[Any, Any], data_dict: dict[Any, Any], key: str
):
"""Validate nested items."""
options = {}
@ -241,7 +241,7 @@ class AddonOptions(CoreSysAttributes):
return options
def _check_missing_options(
self, origin: Dict[Any, Any], exists: Dict[Any, Any], root: str
self, origin: dict[Any, Any], exists: dict[Any, Any], root: str
) -> None:
"""Check if all options are exists."""
missing = set(origin) - set(exists)
@ -267,9 +267,9 @@ class UiOptions(CoreSysAttributes):
"""Initialize UI option render."""
self.coresys = coresys
def __call__(self, raw_schema: Dict[str, Any]) -> List[Dict[str, Any]]:
def __call__(self, raw_schema: dict[str, Any]) -> list[dict[str, Any]]:
"""Generate UI schema."""
ui_schema: List[Dict[str, Any]] = []
ui_schema: list[dict[str, Any]] = []
# read options
for key, value in raw_schema.items():
@ -287,13 +287,13 @@ class UiOptions(CoreSysAttributes):
def _single_ui_option(
self,
ui_schema: List[Dict[str, Any]],
ui_schema: list[dict[str, Any]],
value: str,
key: str,
multiple: bool = False,
) -> None:
"""Validate a single element."""
ui_node: Dict[str, Union[str, bool, float, List[str]]] = {"name": key}
ui_node: dict[str, Union[str, bool, float, list[str]]] = {"name": key}
# If multiple
if multiple:
@ -365,8 +365,8 @@ class UiOptions(CoreSysAttributes):
def _nested_ui_list(
self,
ui_schema: List[Dict[str, Any]],
option_list: List[Any],
ui_schema: list[dict[str, Any]],
option_list: list[Any],
key: str,
) -> None:
"""UI nested list items."""
@ -383,8 +383,8 @@ class UiOptions(CoreSysAttributes):
def _nested_ui_dict(
self,
ui_schema: List[Dict[str, Any]],
option_dict: Dict[str, Any],
ui_schema: list[dict[str, Any]],
option_dict: dict[str, Any],
key: str,
multiple: bool = False,
) -> None:
@ -408,7 +408,7 @@ class UiOptions(CoreSysAttributes):
ui_schema.append(ui_node)
def _create_device_filter(str_filter: str) -> Dict[str, Any]:
def _create_device_filter(str_filter: str) -> dict[str, Any]:
"""Generate device Filter."""
raw_filter = dict(value.split("=") for value in str_filter.split(";"))

View File

@ -2,7 +2,7 @@
import logging
import re
import secrets
from typing import Any, Dict
from typing import Any
import uuid
import voluptuous as vol
@ -148,7 +148,7 @@ RE_MACHINE = re.compile(
)
def _warn_addon_config(config: Dict[str, Any]):
def _warn_addon_config(config: dict[str, Any]):
"""Warn about miss configs."""
name = config.get(ATTR_NAME)
if not name:
@ -179,7 +179,7 @@ def _warn_addon_config(config: Dict[str, Any]):
def _migrate_addon_config(protocol=False):
"""Migrate addon config."""
def _migrate(config: Dict[str, Any]):
def _migrate(config: dict[str, Any]):
name = config.get(ATTR_NAME)
if not name:
raise vol.Invalid("Invalid Add-on config!")

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Home Assistant RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict, List
from typing import Any, Awaitable
from aiohttp import web
import voluptuous as vol
@ -156,7 +156,7 @@ class APIAddons(CoreSysAttributes):
return addon
@api_process
async def list(self, request: web.Request) -> Dict[str, Any]:
async def list(self, request: web.Request) -> dict[str, Any]:
"""Return all add-ons or repositories."""
data_addons = [
{
@ -201,7 +201,7 @@ class APIAddons(CoreSysAttributes):
await asyncio.shield(self.sys_store.reload())
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return add-on information."""
addon: AnyAddon = self._extract_addon(request)
@ -390,7 +390,7 @@ class APIAddons(CoreSysAttributes):
async def security(self, request: web.Request) -> None:
"""Store security options for add-on."""
addon = self._extract_addon_installed(request)
body: Dict[str, Any] = await api_validate(SCHEMA_SECURITY, request)
body: dict[str, Any] = await api_validate(SCHEMA_SECURITY, request)
if ATTR_PROTECTED in body:
_LOGGER.warning("Changing protected flag for %s!", addon.slug)
@ -399,7 +399,7 @@ class APIAddons(CoreSysAttributes):
addon.save_persist()
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
addon = self._extract_addon_installed(request)
@ -503,6 +503,6 @@ class APIAddons(CoreSysAttributes):
await asyncio.shield(addon.write_stdin(data))
def _pretty_services(addon: AnyAddon) -> List[str]:
def _pretty_services(addon: AnyAddon) -> list[str]:
"""Return a simplified services role list."""
return [f"{name}:{access}" for name, access in addon.services_role.items()]

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Audio RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import attr
@ -67,7 +67,7 @@ class APIAudio(CoreSysAttributes):
"""Handle RESTful API for Audio functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return Audio information."""
return {
ATTR_VERSION: self.sys_plugins.audio.version,
@ -89,7 +89,7 @@ class APIAudio(CoreSysAttributes):
}
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
stats = await self.sys_plugins.audio.stats()

View File

@ -1,7 +1,6 @@
"""Init file for Supervisor auth/SSO RESTful API."""
import asyncio
import logging
from typing import Dict
from aiohttp import BasicAuth, web
from aiohttp.hdrs import AUTHORIZATION, CONTENT_TYPE, WWW_AUTHENTICATE
@ -29,7 +28,7 @@ SCHEMA_PASSWORD_RESET = vol.Schema(
}
)
REALM_HEADER: Dict[str, str] = {
REALM_HEADER: dict[str, str] = {
WWW_AUTHENTICATE: 'Basic realm="Home Assistant Authentication"'
}
@ -46,7 +45,7 @@ class APIAuth(CoreSysAttributes):
return self.sys_auth.check_login(addon, auth.login, auth.password)
def _process_dict(
self, request: web.Request, addon: Addon, data: Dict[str, str]
self, request: web.Request, addon: Addon, data: dict[str, str]
) -> bool:
"""Process login with dict data.
@ -86,7 +85,7 @@ class APIAuth(CoreSysAttributes):
@api_process
async def reset(self, request: web.Request) -> None:
"""Process reset password request."""
body: Dict[str, str] = await api_validate(SCHEMA_PASSWORD_RESET, request)
body: dict[str, str] = await api_validate(SCHEMA_PASSWORD_RESET, request)
await asyncio.shield(
self.sys_auth.change_password(body[ATTR_USERNAME], body[ATTR_PASSWORD])
)

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor HA cli RESTful API."""
import asyncio
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
import voluptuous as vol
@ -32,7 +32,7 @@ class APICli(CoreSysAttributes):
"""Handle RESTful API for HA Cli functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return HA cli information."""
return {
ATTR_VERSION: self.sys_plugins.cli.version,
@ -41,7 +41,7 @@ class APICli(CoreSysAttributes):
}
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
stats = await self.sys_plugins.cli.stats()

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor DNS RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import voluptuous as vol
@ -40,7 +40,7 @@ class APICoreDNS(CoreSysAttributes):
"""Handle RESTful API for DNS functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return DNS information."""
return {
ATTR_VERSION: self.sys_plugins.dns.version,
@ -63,7 +63,7 @@ class APICoreDNS(CoreSysAttributes):
self.sys_plugins.dns.save_data()
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
stats = await self.sys_plugins.dns.stats()

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor Home Assistant RESTful API."""
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
import voluptuous as vol
@ -33,7 +33,7 @@ class APIDocker(CoreSysAttributes):
"""Handle RESTful API for Docker configuration."""
@api_process
async def registries(self, request) -> Dict[str, Any]:
async def registries(self, request) -> dict[str, Any]:
"""Return the list of registries."""
data_registries = {}
for hostname, registry in self.sys_docker.config.registries.items():

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor hardware RESTful API."""
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
@ -19,7 +19,7 @@ from .utils import api_process
_LOGGER: logging.Logger = logging.getLogger(__name__)
def device_struct(device: Device) -> Dict[str, Any]:
def device_struct(device: Device) -> dict[str, Any]:
"""Return a dict with information of a interface to be used in th API."""
return {
ATTR_NAME: device.name,
@ -35,7 +35,7 @@ class APIHardware(CoreSysAttributes):
"""Handle RESTful API for hardware functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Show hardware info."""
return {
ATTR_DEVICES: [
@ -44,7 +44,7 @@ class APIHardware(CoreSysAttributes):
}
@api_process
async def audio(self, request: web.Request) -> Dict[str, Any]:
async def audio(self, request: web.Request) -> dict[str, Any]:
"""Show pulse audio profiles."""
return {
ATTR_AUDIO: {

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Home Assistant RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import voluptuous as vol
@ -61,7 +61,7 @@ class APIHomeAssistant(CoreSysAttributes):
"""Handle RESTful API for Home Assistant functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return host information."""
return {
ATTR_VERSION: self.sys_homeassistant.version,
@ -117,7 +117,7 @@ class APIHomeAssistant(CoreSysAttributes):
self.sys_homeassistant.save_data()
@api_process
async def stats(self, request: web.Request) -> Dict[Any, str]:
async def stats(self, request: web.Request) -> dict[Any, str]:
"""Return resource information."""
stats = await self.sys_homeassistant.core.stats()
if not stats:

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor info RESTful API."""
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
@ -31,7 +31,7 @@ class APIInfo(CoreSysAttributes):
"""Handle RESTful API for info functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Show system info."""
return {
ATTR_SUPERVISOR: self.sys_supervisor.version,

View File

@ -2,7 +2,7 @@
import asyncio
from ipaddress import ip_address
import logging
from typing import Any, Dict, Union
from typing import Any, Union
import aiohttp
from aiohttp import ClientTimeout, hdrs, web
@ -54,7 +54,7 @@ class APIIngress(CoreSysAttributes):
return f"http://{addon.ip_address}:{addon.ingress_port}/{path}"
@api_process
async def panels(self, request: web.Request) -> Dict[str, Any]:
async def panels(self, request: web.Request) -> dict[str, Any]:
"""Create a list of panel data."""
addons = {}
for addon in self.sys_ingress.addons:
@ -69,14 +69,14 @@ class APIIngress(CoreSysAttributes):
@api_process
@require_home_assistant
async def create_session(self, request: web.Request) -> Dict[str, Any]:
async def create_session(self, request: web.Request) -> dict[str, Any]:
"""Create a new session."""
session = self.sys_ingress.create_session()
return {ATTR_SESSION: session}
@api_process
@require_home_assistant
async def validate_session(self, request: web.Request) -> Dict[str, Any]:
async def validate_session(self, request: web.Request) -> dict[str, Any]:
"""Validate session and extending how long it's valid for."""
data = await api_validate(VALIDATE_SESSION_DATA, request)
@ -220,7 +220,7 @@ class APIIngress(CoreSysAttributes):
def _init_header(
request: web.Request, addon: str
) -> Union[CIMultiDict, Dict[str, str]]:
) -> Union[CIMultiDict, dict[str, str]]:
"""Create initial header."""
headers = {}
@ -248,7 +248,7 @@ def _init_header(
return headers
def _response_header(response: aiohttp.ClientResponse) -> Dict[str, str]:
def _response_header(response: aiohttp.ClientResponse) -> dict[str, str]:
"""Create response header."""
headers = {}

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor Jobs RESTful API."""
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
import voluptuous as vol
@ -20,7 +20,7 @@ class APIJobs(CoreSysAttributes):
"""Handle RESTful API for OS functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return JobManager information."""
return {
ATTR_IGNORE_CONDITIONS: self.sys_jobs.ignore_conditions,

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Multicast RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import voluptuous as vol
@ -34,7 +34,7 @@ class APIMulticast(CoreSysAttributes):
"""Handle RESTful API for Multicast functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return Multicast information."""
return {
ATTR_VERSION: self.sys_plugins.multicast.version,
@ -43,7 +43,7 @@ class APIMulticast(CoreSysAttributes):
}
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
stats = await self.sys_plugins.multicast.stats()

View File

@ -1,7 +1,7 @@
"""REST API for network."""
import asyncio
from ipaddress import ip_address, ip_interface
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import attr
@ -82,7 +82,7 @@ SCHEMA_UPDATE = vol.Schema(
)
def ipconfig_struct(config: IpConfig) -> Dict[str, Any]:
def ipconfig_struct(config: IpConfig) -> dict[str, Any]:
"""Return a dict with information about ip configuration."""
return {
ATTR_METHOD: config.method,
@ -92,7 +92,7 @@ def ipconfig_struct(config: IpConfig) -> Dict[str, Any]:
}
def wifi_struct(config: WifiConfig) -> Dict[str, Any]:
def wifi_struct(config: WifiConfig) -> dict[str, Any]:
"""Return a dict with information about wifi configuration."""
return {
ATTR_MODE: config.mode,
@ -102,7 +102,7 @@ def wifi_struct(config: WifiConfig) -> Dict[str, Any]:
}
def vlan_struct(config: VlanConfig) -> Dict[str, Any]:
def vlan_struct(config: VlanConfig) -> dict[str, Any]:
"""Return a dict with information about VLAN configuration."""
return {
ATTR_ID: config.id,
@ -110,7 +110,7 @@ def vlan_struct(config: VlanConfig) -> Dict[str, Any]:
}
def interface_struct(interface: Interface) -> Dict[str, Any]:
def interface_struct(interface: Interface) -> dict[str, Any]:
"""Return a dict with information of a interface to be used in th API."""
return {
ATTR_INTERFACE: interface.name,
@ -125,7 +125,7 @@ def interface_struct(interface: Interface) -> Dict[str, Any]:
}
def accesspoint_struct(accesspoint: AccessPoint) -> Dict[str, Any]:
def accesspoint_struct(accesspoint: AccessPoint) -> dict[str, Any]:
"""Return a dict for AccessPoint."""
return {
ATTR_MODE: accesspoint.mode,
@ -158,7 +158,7 @@ class APINetwork(CoreSysAttributes):
raise APIError(f"Interface {name} does not exist") from None
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return network information."""
return {
ATTR_INTERFACES: [
@ -176,7 +176,7 @@ class APINetwork(CoreSysAttributes):
}
@api_process
async def interface_info(self, request: web.Request) -> Dict[str, Any]:
async def interface_info(self, request: web.Request) -> dict[str, Any]:
"""Return network information for a interface."""
interface = self._get_interface(request.match_info.get(ATTR_INTERFACE))
@ -223,7 +223,7 @@ class APINetwork(CoreSysAttributes):
return asyncio.shield(self.sys_host.network.update())
@api_process
async def scan_accesspoints(self, request: web.Request) -> Dict[str, Any]:
async def scan_accesspoints(self, request: web.Request) -> dict[str, Any]:
"""Scan and return a list of available networks."""
interface = self._get_interface(request.match_info.get(ATTR_INTERFACE))

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Observer RESTful API."""
import asyncio
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
import voluptuous as vol
@ -33,7 +33,7 @@ class APIObserver(CoreSysAttributes):
"""Handle RESTful API for Observer functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return HA Observer information."""
return {
ATTR_HOST: str(self.sys_docker.network.observer),
@ -43,7 +43,7 @@ class APIObserver(CoreSysAttributes):
}
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
stats = await self.sys_plugins.observer.stats()

View File

@ -2,7 +2,7 @@
import asyncio
import logging
from pathlib import Path
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import voluptuous as vol
@ -30,7 +30,7 @@ class APIOS(CoreSysAttributes):
"""Handle RESTful API for OS functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return OS information."""
return {
ATTR_VERSION: self.sys_os.version,
@ -62,7 +62,7 @@ class APIOS(CoreSysAttributes):
await asyncio.shield(self.sys_os.datadisk.migrate_disk(body[ATTR_DEVICE]))
@api_process
async def list_data(self, request: web.Request) -> Dict[str, Any]:
async def list_data(self, request: web.Request) -> dict[str, Any]:
"""Return possible data targets."""
return {
ATTR_DEVICES: self.sys_os.datadisk.available_disks,

View File

@ -1,6 +1,6 @@
"""Handle REST API for resoulution."""
import asyncio
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import attr
@ -26,7 +26,7 @@ class APIResoulution(CoreSysAttributes):
"""Handle REST API for resoulution."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return network information."""
return {
ATTR_UNSUPPORTED: self.sys_resolution.unsupported,

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor Security RESTful API."""
import logging
from typing import Any, Dict
from typing import Any
from aiohttp import web
import voluptuous as vol
@ -25,7 +25,7 @@ class APISecurity(CoreSysAttributes):
"""Handle RESTful API for Security functions."""
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return Security information."""
return {
ATTR_CONTENT_TRUST: self.sys_security.content_trust,

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor Home Assistant RESTful API."""
import asyncio
from typing import Any, Awaitable, Dict, List
from typing import Any, Awaitable
from aiohttp import web
@ -67,7 +67,7 @@ class APIStore(CoreSysAttributes):
return repository
def _generate_addon_information(self, addon: AddonStore) -> Dict[str, Any]:
def _generate_addon_information(self, addon: AddonStore) -> dict[str, Any]:
"""Generate addon information."""
return {
ATTR_ADVANCED: addon.advanced,
@ -90,7 +90,7 @@ class APIStore(CoreSysAttributes):
def _generate_repository_information(
self, repository: Repository
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Generate repository information."""
return {
ATTR_SLUG: repository.slug,
@ -106,7 +106,7 @@ class APIStore(CoreSysAttributes):
await asyncio.shield(self.sys_store.reload())
@api_process
async def store_info(self, request: web.Request) -> Dict[str, Any]:
async def store_info(self, request: web.Request) -> dict[str, Any]:
"""Return store information."""
return {
ATTR_ADDONS: [
@ -120,7 +120,7 @@ class APIStore(CoreSysAttributes):
}
@api_process
async def addons_list(self, request: web.Request) -> List[Dict[str, Any]]:
async def addons_list(self, request: web.Request) -> list[dict[str, Any]]:
"""Return all store add-ons."""
return [
self._generate_addon_information(self.sys_addons.store[addon])
@ -142,13 +142,13 @@ class APIStore(CoreSysAttributes):
return asyncio.shield(addon.update())
@api_process
async def addons_addon_info(self, request: web.Request) -> Dict[str, Any]:
async def addons_addon_info(self, request: web.Request) -> dict[str, Any]:
"""Return add-on information."""
addon: AddonStore = self._extract_addon(request)
return self._generate_addon_information(addon)
@api_process
async def repositories_list(self, request: web.Request) -> List[Dict[str, Any]]:
async def repositories_list(self, request: web.Request) -> list[dict[str, Any]]:
"""Return all repositories."""
return [
self._generate_repository_information(repository)
@ -158,7 +158,7 @@ class APIStore(CoreSysAttributes):
@api_process
async def repositories_repository_info(
self, request: web.Request
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Return repository information."""
repository: Repository = self._extract_repository(request)
return self._generate_repository_information(repository)

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Supervisor RESTful API."""
import asyncio
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from aiohttp import web
import voluptuous as vol
@ -82,7 +82,7 @@ class APISupervisor(CoreSysAttributes):
return True
@api_process
async def info(self, request: web.Request) -> Dict[str, Any]:
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return host information."""
list_addons = []
for addon in self.sys_addons.installed:
@ -178,7 +178,7 @@ class APISupervisor(CoreSysAttributes):
await self.sys_resolution.evaluate.evaluate_system()
@api_process
async def stats(self, request: web.Request) -> Dict[str, Any]:
async def stats(self, request: web.Request) -> dict[str, Any]:
"""Return resource information."""
stats = await self.sys_supervisor.stats()

View File

@ -1,6 +1,6 @@
"""Init file for Supervisor util for RESTful API."""
import json
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from aiohttp import web
from aiohttp.hdrs import AUTHORIZATION
@ -46,7 +46,7 @@ def excract_supervisor_token(request: web.Request) -> Optional[str]:
return None
def json_loads(data: Any) -> Dict[str, Any]:
def json_loads(data: Any) -> dict[str, Any]:
"""Extract json from string with support for '' and None."""
if not data:
return {}
@ -135,7 +135,7 @@ def api_return_error(
)
def api_return_ok(data: Optional[Dict[str, Any]] = None) -> web.Response:
def api_return_ok(data: Optional[dict[str, Any]] = None) -> web.Response:
"""Return an API ok answer."""
return web.json_response(
{JSON_RESULT: RESULT_OK, JSON_DATA: data or {}},
@ -144,10 +144,10 @@ def api_return_ok(data: Optional[Dict[str, Any]] = None) -> web.Response:
async def api_validate(
schema: vol.Schema, request: web.Request, origin: Optional[List[str]] = None
) -> Dict[str, Any]:
schema: vol.Schema, request: web.Request, origin: Optional[list[str]] = None
) -> dict[str, Any]:
"""Validate request data with schema."""
data: Dict[str, Any] = await request.json(loads=json_loads)
data: dict[str, Any] = await request.json(loads=json_loads)
try:
data_validated = schema(data)
except vol.Invalid as ex:

View File

@ -2,7 +2,6 @@
import logging
from pathlib import Path
import platform
from typing import List
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import ConfigurationFileError, HassioArchNotFound
@ -28,7 +27,7 @@ class CpuArch(CoreSysAttributes):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize CPU Architecture handler."""
self.coresys = coresys
self._supported_arch: List[str] = []
self._supported_arch: list[str] = []
self._default_arch: str
@property
@ -42,7 +41,7 @@ class CpuArch(CoreSysAttributes):
return self.sys_supervisor.arch
@property
def supported(self) -> List[str]:
def supported(self) -> list[str]:
"""Return support arch by CPU/Machine."""
return self._supported_arch
@ -71,11 +70,11 @@ class CpuArch(CoreSysAttributes):
if native_support not in self._supported_arch:
self._supported_arch.append(native_support)
def is_supported(self, arch_list: List[str]) -> bool:
def is_supported(self, arch_list: list[str]) -> bool:
"""Return True if there is a supported arch by this platform."""
return not set(self.supported).isdisjoint(set(arch_list))
def match(self, arch_list: List[str]) -> str:
def match(self, arch_list: list[str]) -> str:
"""Return best match for this CPU/Platform."""
for self_arch in self.supported:
if self_arch in arch_list:

View File

@ -2,7 +2,7 @@
import asyncio
import hashlib
import logging
from typing import Dict, Optional
from typing import Optional
from .addons.addon import Addon
from .const import ATTR_ADDON, ATTR_PASSWORD, ATTR_USERNAME, FILE_HASSIO_AUTH
@ -22,7 +22,7 @@ class Auth(FileConfiguration, CoreSysAttributes):
super().__init__(FILE_HASSIO_AUTH, SCHEMA_AUTH_CONFIG)
self.coresys: CoreSys = coresys
self._running: Dict[str, asyncio.Task] = {}
self._running: dict[str, asyncio.Task] = {}
def _check_cache(self, username: str, password: str) -> Optional[bool]:
"""Check password in cache."""

View File

@ -5,7 +5,7 @@ import logging
from pathlib import Path
import tarfile
from tempfile import TemporaryDirectory
from typing import Any, Dict, List, Optional, Set
from typing import Any, Optional
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
@ -70,7 +70,7 @@ class Backup(CoreSysAttributes):
"""Initialize a backup."""
self.coresys: CoreSys = coresys
self._tarfile: Path = tar_file
self._data: Dict[str, Any] = {}
self._data: dict[str, Any] = {}
self._tmp = None
self._key: Optional[bytes] = None
self._aes: Optional[Cipher] = None
@ -311,9 +311,9 @@ class Backup(CoreSysAttributes):
finally:
self._tmp.cleanup()
async def store_addons(self, addon_list: Optional[List[Addon]] = None):
async def store_addons(self, addon_list: Optional[list[Addon]] = None):
"""Add a list of add-ons into backup."""
addon_list: List[Addon] = addon_list or self.sys_addons.installed
addon_list: list[Addon] = addon_list or self.sys_addons.installed
async def _addon_save(addon: Addon):
"""Task to store an add-on into backup."""
@ -346,9 +346,9 @@ class Backup(CoreSysAttributes):
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't save Add-on %s: %s", addon.slug, err)
async def restore_addons(self, addon_list: Optional[List[str]] = None):
async def restore_addons(self, addon_list: Optional[list[str]] = None):
"""Restore a list add-on from backup."""
addon_list: List[str] = addon_list or self.addon_list
addon_list: list[str] = addon_list or self.addon_list
async def _addon_restore(addon_slug: str):
"""Task to restore an add-on into backup."""
@ -375,9 +375,9 @@ class Backup(CoreSysAttributes):
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't restore Add-on %s: %s", slug, err)
async def store_folders(self, folder_list: Optional[List[str]] = None):
async def store_folders(self, folder_list: Optional[list[str]] = None):
"""Backup Supervisor data into backup."""
folder_list: Set[str] = set(folder_list or ALL_FOLDERS)
folder_list: set[str] = set(folder_list or ALL_FOLDERS)
def _folder_save(name: str):
"""Take backup of a folder."""
@ -414,9 +414,9 @@ class Backup(CoreSysAttributes):
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't save folder %s: %s", folder, err)
async def restore_folders(self, folder_list: Optional[List[str]] = None):
async def restore_folders(self, folder_list: Optional[list[str]] = None):
"""Backup Supervisor data into backup."""
folder_list: Set[str] = set(folder_list or self.folders)
folder_list: set[str] = set(folder_list or self.folders)
def _folder_restore(name: str):
"""Intenal function to restore a folder."""

View File

@ -2,7 +2,7 @@
import asyncio
import logging
from pathlib import Path
from typing import Awaitable, Set
from typing import Awaitable
from awesomeversion.awesomeversion import AwesomeVersion
from awesomeversion.exceptions import AwesomeVersionCompare
@ -29,7 +29,7 @@ class BackupManager(CoreSysAttributes):
self.lock = asyncio.Lock()
@property
def list_backups(self) -> Set[Backup]:
def list_backups(self) -> set[Backup]:
"""Return a list of all backup objects."""
return set(self._backups.values())

View File

@ -2,7 +2,7 @@
from __future__ import annotations
import logging
from typing import Any, Awaitable, Callable, Dict, List
from typing import Any, Awaitable, Callable
import attr
@ -18,7 +18,7 @@ class Bus(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize bus backend."""
self.coresys = coresys
self._listeners: Dict[BusEvent, List[EventListener]] = {}
self._listeners: dict[BusEvent, list[EventListener]] = {}
def register_event(
self, event: BusEvent, callback: Callable[[Any], Awaitable[None]]

View File

@ -3,7 +3,7 @@ from datetime import datetime
import logging
import os
from pathlib import Path, PurePath
from typing import List, Optional
from typing import Optional
from awesomeversion import AwesomeVersion
@ -294,7 +294,7 @@ class CoreConfig(FileConfiguration):
return PurePath(self.path_extern_supervisor, MEDIA_DATA)
@property
def addons_repositories(self) -> List[str]:
def addons_repositories(self) -> list[str]:
"""Return list of custom Add-on repositories."""
return self._data[ATTR_ADDONS_CUSTOM_LIST]

View File

@ -3,7 +3,7 @@ import asyncio
from contextlib import suppress
from datetime import timedelta
import logging
from typing import Awaitable, List, Optional
from typing import Awaitable, Optional
import async_timeout
@ -105,7 +105,7 @@ class Core(CoreSysAttributes):
await self.sys_supervisor.check_connectivity()
# Order can be important!
setup_loads: List[Awaitable[None]] = [
setup_loads: list[Awaitable[None]] = [
# rest api views
self.sys_api.load(),
# Load Host Hardware

View File

@ -6,7 +6,7 @@ from datetime import datetime
import logging
import os
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Coroutine, TypeVar
import aiohttp
import sentry_sdk
@ -54,8 +54,8 @@ class CoreSys:
def __init__(self):
"""Initialize coresys."""
# Static attributes protected
self._machine_id: Optional[str] = None
self._machine: Optional[str] = None
self._machine_id: str | None = None
self._machine: str | None = None
# External objects
self._loop: asyncio.BaseEventLoop = asyncio.get_running_loop()
@ -66,30 +66,30 @@ class CoreSys:
self._docker: DockerAPI = DockerAPI()
# Internal objects pointers
self._core: Optional[Core] = None
self._arch: Optional[CpuArch] = None
self._auth: Optional[Auth] = None
self._homeassistant: Optional[HomeAssistant] = None
self._supervisor: Optional[Supervisor] = None
self._addons: Optional[AddonManager] = None
self._api: Optional[RestAPI] = None
self._updater: Optional[Updater] = None
self._backups: Optional[BackupManager] = None
self._tasks: Optional[Tasks] = None
self._host: Optional[HostManager] = None
self._ingress: Optional[Ingress] = None
self._dbus: Optional[DBusManager] = None
self._os: Optional[OSManager] = None
self._services: Optional[ServiceManager] = None
self._scheduler: Optional[Scheduler] = None
self._store: Optional[StoreManager] = None
self._discovery: Optional[Discovery] = None
self._hardware: Optional[HardwareManager] = None
self._plugins: Optional[PluginManager] = None
self._resolution: Optional[ResolutionManager] = None
self._jobs: Optional[JobManager] = None
self._security: Optional[Security] = None
self._bus: Optional[Bus] = None
self._core: Core | None = None
self._arch: CpuArch | None = None
self._auth: Auth | None = None
self._homeassistant: HomeAssistant | None = None
self._supervisor: Supervisor | None = None
self._addons: AddonManager | None = None
self._api: RestAPI | None = None
self._updater: Updater | None = None
self._backups: BackupManager | None = None
self._tasks: Tasks | None = None
self._host: HostManager | None = None
self._ingress: Ingress | None = None
self._dbus: DBusManager | None = None
self._os: OSManager | None = None
self._services: ServiceManager | None = None
self._scheduler: Scheduler | None = None
self._store: StoreManager | None = None
self._discovery: Discovery | None = None
self._hardware: HardwareManager | None = None
self._plugins: PluginManager | None = None
self._resolution: ResolutionManager | None = None
self._jobs: JobManager | None = None
self._security: Security | None = None
self._bus: Bus | None = None
# Set default header for aiohttp
self._websession._default_headers = MappingProxyType(
@ -467,7 +467,7 @@ class CoreSys:
self._jobs = value
@property
def machine(self) -> Optional[str]:
def machine(self) -> str | None:
"""Return machine type string."""
return self._machine
@ -479,7 +479,7 @@ class CoreSys:
self._machine = value
@property
def machine_id(self) -> Optional[str]:
def machine_id(self) -> str | None:
"""Return machine-id type string."""
return self._machine_id
@ -520,7 +520,7 @@ class CoreSysAttributes:
return self.coresys.timezone
@property
def sys_machine(self) -> Optional[str]:
def sys_machine(self) -> str | None:
"""Return running machine type of the Supervisor system."""
return self.coresys.machine

View File

@ -1,7 +1,7 @@
"""OS-Agent implementation for DBUS."""
import asyncio
import logging
from typing import Any, Dict
from typing import Any
from awesomeversion import AwesomeVersion
@ -30,7 +30,7 @@ class OSAgent(DBusInterface):
def __init__(self) -> None:
"""Initialize Properties."""
self.properties: Dict[str, Any] = {}
self.properties: dict[str, Any] = {}
self._cgroup: CGroup = CGroup()
self._apparmor: AppArmor = AppArmor()

View File

@ -1,6 +1,6 @@
"""AppArmor object for OS-Agent."""
from pathlib import Path
from typing import Any, Dict
from typing import Any
from awesomeversion import AwesomeVersion
@ -20,7 +20,7 @@ class AppArmor(DBusInterface):
def __init__(self) -> None:
"""Initialize Properties."""
self.properties: Dict[str, Any] = {}
self.properties: dict[str, Any] = {}
@property
@dbus_property

View File

@ -1,6 +1,6 @@
"""DataDisk object for OS-Agent."""
from pathlib import Path
from typing import Any, Dict
from typing import Any
from ...utils.gdbus import DBus
from ..const import (
@ -18,7 +18,7 @@ class DataDisk(DBusInterface):
def __init__(self) -> None:
"""Initialize Properties."""
self.properties: Dict[str, Any] = {}
self.properties: dict[str, Any] = {}
@property
@dbus_property

View File

@ -1,6 +1,6 @@
"""D-Bus interface for hostname."""
import logging
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..exceptions import DBusError, DBusInterfaceError
from ..utils.gdbus import DBus
@ -27,7 +27,7 @@ class Hostname(DBusInterface):
def __init__(self):
"""Initialize Properties."""
self.properties: Dict[str, Any] = {}
self.properties: dict[str, Any] = {}
async def connect(self):
"""Connect to system's D-Bus."""

View File

@ -1,7 +1,7 @@
"""Interface class for D-Bus wrappers."""
from abc import ABC, abstractmethod
from functools import wraps
from typing import Any, Dict, Optional
from typing import Any, Optional
from ..utils.gdbus import DBus
@ -44,7 +44,7 @@ class DBusInterfaceProxy(ABC):
dbus: Optional[DBus] = None
object_path: Optional[str] = None
properties: Optional[Dict[str, Any]] = None
properties: Optional[dict[str, Any]] = None
@abstractmethod
async def connect(self):

View File

@ -1,6 +1,5 @@
"""D-Bus interface objects."""
import logging
from typing import List
from ..const import SOCKET_DBUS
from ..coresys import CoreSys, CoreSysAttributes
@ -74,7 +73,7 @@ class DBusManager(CoreSysAttributes):
)
return
dbus_loads: List[DBusInterface] = [
dbus_loads: list[DBusInterface] = [
self.agent,
self.systemd,
self.logind,

View File

@ -1,6 +1,6 @@
"""Network Manager implementation for DBUS."""
import logging
from typing import Any, Awaitable, Dict
from typing import Any, Awaitable
from awesomeversion import AwesomeVersion, AwesomeVersionException
import sentry_sdk
@ -42,9 +42,9 @@ class NetworkManager(DBusInterface):
"""Initialize Properties."""
self._dns: NetworkManagerDNS = NetworkManagerDNS()
self._settings: NetworkManagerSettings = NetworkManagerSettings()
self._interfaces: Dict[str, NetworkInterface] = {}
self._interfaces: dict[str, NetworkInterface] = {}
self.properties: Dict[str, Any] = {}
self.properties: dict[str, Any] = {}
@property
def dns(self) -> NetworkManagerDNS:
@ -57,7 +57,7 @@ class NetworkManager(DBusInterface):
return self._settings
@property
def interfaces(self) -> Dict[str, NetworkInterface]:
def interfaces(self) -> dict[str, NetworkInterface]:
"""Return a dictionary of active interfaces."""
return self._interfaces

View File

@ -1,6 +1,6 @@
"""NetworkConnection object4s for Network Manager."""
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
from typing import List, Optional, Union
from typing import Optional, Union
import attr
@ -10,16 +10,16 @@ class IpConfiguration:
"""NetworkSettingsIPConfig object for Network Manager."""
gateway: Optional[Union[IPv6Address, IPv6Address]] = attr.ib()
nameservers: List[Union[IPv6Address, IPv6Address]] = attr.ib()
address: List[Union[IPv4Interface, IPv6Interface]] = attr.ib()
nameservers: list[Union[IPv6Address, IPv6Address]] = attr.ib()
address: list[Union[IPv4Interface, IPv6Interface]] = attr.ib()
@attr.s(slots=True)
class DNSConfiguration:
"""DNS configuration Object."""
nameservers: List[Union[IPv4Address, IPv6Address]] = attr.ib()
domains: List[str] = attr.ib()
nameservers: list[Union[IPv4Address, IPv6Address]] = attr.ib()
domains: list[str] = attr.ib()
interface: str = attr.ib()
priority: int = attr.ib()
vpn: bool = attr.ib()

View File

@ -1,7 +1,7 @@
"""D-Bus interface for hostname."""
from ipaddress import ip_address
import logging
from typing import List, Optional
from typing import Optional
from ...const import (
ATTR_DOMAINS,
@ -34,7 +34,7 @@ class NetworkManagerDNS(DBusInterface):
"""Initialize Properties."""
self._mode: Optional[str] = None
self._rc_manager: Optional[str] = None
self._configuration: List[DNSConfiguration] = []
self._configuration: list[DNSConfiguration] = []
@property
def mode(self) -> Optional[str]:
@ -47,7 +47,7 @@ class NetworkManagerDNS(DBusInterface):
return self._rc_manager
@property
def configuration(self) -> List[DNSConfiguration]:
def configuration(self) -> list[DNSConfiguration]:
"""Return Propertie configuraton."""
return self._configuration

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from ipaddress import IPv4Address, IPv6Address
from pathlib import Path
import socket
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from uuid import uuid4
import jinja2
@ -21,7 +21,7 @@ INTERFACE_UPDATE_TEMPLATE: Path = (
def interface_update_payload(
interface: Interface, name: Optional[str] = None, uuid: Optional[str] = None
interface: Interface, name: str | None = None, uuid: str | None = None
) -> str:
"""Generate a payload for network interface update."""
env = jinja2.Environment()

View File

@ -1,7 +1,7 @@
"""Interface to systemd-timedate over D-Bus."""
from datetime import datetime
import logging
from typing import Any, Dict
from typing import Any
from ..exceptions import DBusError, DBusInterfaceError
from ..utils.dt import utc_from_timestamp
@ -28,7 +28,7 @@ class TimeDate(DBusInterface):
def __init__(self) -> None:
"""Initialize Properties."""
self.properties: Dict[str, Any] = {}
self.properties: dict[str, Any] = {}
@property
@dbus_property

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from contextlib import suppress
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4
import attr
@ -31,7 +31,7 @@ class Message:
addon: str = attr.ib()
service: str = attr.ib()
config: Dict[str, Any] = attr.ib(eq=False)
config: dict[str, Any] = attr.ib(eq=False)
uuid: UUID = attr.ib(factory=lambda: uuid4().hex, eq=False)
@ -42,7 +42,7 @@ class Discovery(CoreSysAttributes, FileConfiguration):
"""Initialize discovery handler."""
super().__init__(FILE_HASSIO_DISCOVERY, SCHEMA_DISCOVERY_CONFIG)
self.coresys: CoreSys = coresys
self.message_obj: Dict[str, Message] = {}
self.message_obj: dict[str, Message] = {}
async def load(self) -> None:
"""Load exists discovery message into storage."""
@ -56,7 +56,7 @@ class Discovery(CoreSysAttributes, FileConfiguration):
def save(self) -> None:
"""Write discovery message into data file."""
messages: List[Dict[str, Any]] = []
messages: list[dict[str, Any]] = []
for message in self.list_messages:
messages.append(attr.asdict(message))
@ -64,16 +64,16 @@ class Discovery(CoreSysAttributes, FileConfiguration):
self._data[ATTR_DISCOVERY].extend(messages)
self.save_data()
def get(self, uuid: str) -> Optional[Message]:
def get(self, uuid: str) -> Message | None:
"""Return discovery message."""
return self.message_obj.get(uuid)
@property
def list_messages(self) -> List[Message]:
def list_messages(self) -> list[Message]:
"""Return list of available discovery messages."""
return list(self.message_obj.values())
def send(self, addon: Addon, service: str, config: Dict[str, Any]) -> Message:
def send(self, addon: Addon, service: str, config: dict[str, Any]) -> Message:
"""Send a discovery message to Home Assistant."""
try:
config = valid_discovery_config(service, config)

View File

@ -4,7 +4,7 @@ from ipaddress import IPv4Address
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Optional
import attr
from awesomeversion import AwesomeVersion, AwesomeVersionCompare
@ -47,7 +47,7 @@ class DockerInfo:
logging: str = attr.ib()
@staticmethod
def new(data: Dict[str, Any]):
def new(data: dict[str, Any]):
"""Create a object from docker info."""
return DockerInfo(
AwesomeVersion(data["ServerVersion"]), data["Driver"], data["LoggingDriver"]
@ -77,7 +77,7 @@ class DockerConfig(FileConfiguration):
super().__init__(FILE_HASSIO_DOCKER, SCHEMA_DOCKER_CONFIG)
@property
def registries(self) -> Dict[str, Any]:
def registries(self) -> dict[str, Any]:
"""Return credentials for docker registries."""
return self._data.get(ATTR_REGISTRIES, {})
@ -91,7 +91,7 @@ class DockerAPI:
def __init__(self):
"""Initialize Docker base wrapper."""
self.docker: docker.DockerClient = docker.DockerClient(
base_url="unix:/{}".format(str(SOCKET_DOCKER)), version="auto", timeout=900
base_url=f"unix:/{str(SOCKET_DOCKER)}", version="auto", timeout=900
)
self.network: DockerNetwork = DockerNetwork(self.docker)
self._info: DockerInfo = DockerInfo.new(self.docker.info())

View File

@ -6,7 +6,7 @@ from ipaddress import IPv4Address, ip_address
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Set, Union
from typing import TYPE_CHECKING, Awaitable
from awesomeversion import AwesomeVersion
import docker
@ -55,7 +55,7 @@ class DockerAddon(DockerInterface):
self.addon = addon
@property
def image(self) -> Optional[str]:
def image(self) -> str | None:
"""Return name of Docker image."""
return self.addon.image
@ -98,7 +98,7 @@ class DockerAddon(DockerInterface):
return f"addon_{self.addon.slug}"
@property
def environment(self) -> Dict[str, Optional[str]]:
def environment(self) -> dict[str, str | None]:
"""Return environment for Docker add-on."""
addon_env = self.addon.environment or {}
@ -118,7 +118,7 @@ class DockerAddon(DockerInterface):
}
@property
def cgroups_rules(self) -> Optional[List[str]]:
def cgroups_rules(self) -> list[str] | None:
"""Return a list of needed cgroups permission."""
rules = set()
@ -177,7 +177,7 @@ class DockerAddon(DockerInterface):
return None
@property
def ports(self) -> Optional[Dict[str, Union[str, int, None]]]:
def ports(self) -> dict[str, str | int | None] | None:
"""Filter None from add-on ports."""
if self.addon.host_network or not self.addon.ports:
return None
@ -189,7 +189,7 @@ class DockerAddon(DockerInterface):
}
@property
def security_opt(self) -> List[str]:
def security_opt(self) -> list[str]:
"""Control security options."""
security = super().security_opt
@ -203,7 +203,7 @@ class DockerAddon(DockerInterface):
return security
@property
def tmpfs(self) -> Optional[Dict[str, str]]:
def tmpfs(self) -> dict[str, str] | None:
"""Return tmpfs for Docker add-on."""
tmpfs = {}
@ -219,7 +219,7 @@ class DockerAddon(DockerInterface):
return None
@property
def network_mapping(self) -> Dict[str, IPv4Address]:
def network_mapping(self) -> dict[str, IPv4Address]:
"""Return hosts mapping."""
return {
"supervisor": self.sys_docker.network.supervisor,
@ -227,23 +227,23 @@ class DockerAddon(DockerInterface):
}
@property
def network_mode(self) -> Optional[str]:
def network_mode(self) -> str | None:
"""Return network mode for add-on."""
if self.addon.host_network:
return "host"
return None
@property
def pid_mode(self) -> Optional[str]:
def pid_mode(self) -> str | None:
"""Return PID mode for add-on."""
if not self.addon.protected and self.addon.host_pid:
return "host"
return None
@property
def capabilities(self) -> Optional[List[str]]:
def capabilities(self) -> list[str] | None:
"""Generate needed capabilities."""
capabilities: Set[Capabilities] = set(self.addon.privileged)
capabilities: set[Capabilities] = set(self.addon.privileged)
# Need work with kernel modules
if self.addon.with_kernel_modules:
@ -259,9 +259,9 @@ class DockerAddon(DockerInterface):
return None
@property
def ulimits(self) -> Optional[List[docker.types.Ulimit]]:
def ulimits(self) -> list[docker.types.Ulimit] | None:
"""Generate ulimits for add-on."""
limits: List[docker.types.Ulimit] = []
limits: list[docker.types.Ulimit] = []
# Need schedule functions
if self.addon.with_realtime:
@ -277,7 +277,7 @@ class DockerAddon(DockerInterface):
return None
@property
def cpu_rt_runtime(self) -> Optional[int]:
def cpu_rt_runtime(self) -> int | None:
"""Limit CPU real-time runtime in microseconds."""
if not self.sys_docker.info.support_cpu_realtime:
return None
@ -288,7 +288,7 @@ class DockerAddon(DockerInterface):
return None
@property
def volumes(self) -> Dict[str, Dict[str, str]]:
def volumes(self) -> dict[str, dict[str, str]]:
"""Generate volumes for mappings."""
addon_mapping = self.addon.map_volumes
@ -494,7 +494,7 @@ class DockerAddon(DockerInterface):
self.sys_capture_exception(err)
def _install(
self, version: AwesomeVersion, image: Optional[str] = None, latest: bool = False
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
"""Pull Docker image or build it.

View File

@ -1,6 +1,6 @@
"""Audio docker object."""
import logging
from typing import Dict, List, Optional
from typing import Optional
import docker
@ -34,7 +34,7 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
return AUDIO_DOCKER_NAME
@property
def volumes(self) -> Dict[str, Dict[str, str]]:
def volumes(self) -> dict[str, dict[str, str]]:
"""Return Volumes for the mount."""
volumes = {
"/dev": {"bind": "/dev", "mode": "ro"},
@ -50,19 +50,19 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
return volumes
@property
def cgroups_rules(self) -> List[str]:
def cgroups_rules(self) -> list[str]:
"""Return a list of needed cgroups permission."""
return self.sys_hardware.policy.get_cgroups_rules(
PolicyGroup.AUDIO
) + self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.BLUETOOTH)
@property
def capabilities(self) -> List[str]:
def capabilities(self) -> list[str]:
"""Generate needed capabilities."""
return [cap.value for cap in (Capabilities.SYS_NICE, Capabilities.SYS_RESOURCE)]
@property
def ulimits(self) -> List[docker.types.Ulimit]:
def ulimits(self) -> list[docker.types.Ulimit]:
"""Generate ulimits for audio."""
# Pulseaudio by default tries to use real-time scheduling with priority of 5.
return [docker.types.Ulimit(name="rtprio", soft=10, hard=10)]

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor Docker object."""
from ipaddress import IPv4Address
import logging
from typing import Awaitable, Dict, List, Optional
from typing import Awaitable, Optional
from awesomeversion import AwesomeVersion, AwesomeVersionCompare
import docker
@ -51,7 +51,7 @@ class DockerHomeAssistant(DockerInterface):
return self.sys_docker.network.gateway
@property
def cgroups_rules(self) -> List[str]:
def cgroups_rules(self) -> list[str]:
"""Return a list of needed cgroups permission."""
return (
self.sys_hardware.policy.get_cgroups_rules(PolicyGroup.UART)
@ -61,7 +61,7 @@ class DockerHomeAssistant(DockerInterface):
)
@property
def volumes(self) -> Dict[str, Dict[str, str]]:
def volumes(self) -> dict[str, dict[str, str]]:
"""Return Volumes for the mount."""
volumes = {
"/dev": {"bind": "/dev", "mode": "ro"},

View File

@ -3,7 +3,7 @@ import asyncio
from contextlib import suppress
import logging
import re
from typing import Any, Awaitable, Dict, List, Optional
from typing import Any, Awaitable, Optional
from awesomeversion import AwesomeVersion
from awesomeversion.strategy import AwesomeVersionStrategy
@ -44,7 +44,7 @@ class DockerInterface(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize Docker base wrapper."""
self.coresys: CoreSys = coresys
self._meta: Optional[Dict[str, Any]] = None
self._meta: Optional[dict[str, Any]] = None
self.lock: asyncio.Lock = asyncio.Lock()
@property
@ -58,21 +58,21 @@ class DockerInterface(CoreSysAttributes):
return None
@property
def meta_config(self) -> Dict[str, Any]:
def meta_config(self) -> dict[str, Any]:
"""Return meta data of configuration for container/image."""
if not self._meta:
return {}
return self._meta.get("Config", {})
@property
def meta_host(self) -> Dict[str, Any]:
def meta_host(self) -> dict[str, Any]:
"""Return meta data of configuration for host."""
if not self._meta:
return {}
return self._meta.get("HostConfig", {})
@property
def meta_labels(self) -> Dict[str, str]:
def meta_labels(self) -> dict[str, str]:
"""Return meta data of labels for container/image."""
return self.meta_config.get("Labels") or {}
@ -102,7 +102,7 @@ class DockerInterface(CoreSysAttributes):
return self.lock.locked()
@property
def security_opt(self) -> List[str]:
def security_opt(self) -> list[str]:
"""Control security options."""
# Disable Seccomp / We don't support it official and it
# causes problems on some types of host systems.
@ -565,7 +565,7 @@ class DockerInterface(CoreSysAttributes):
Need run inside executor.
"""
available_version: List[AwesomeVersion] = []
available_version: list[AwesomeVersion] = []
try:
for image in self.sys_docker.images.list(self.image):
for tag in image.tags:

View File

@ -1,6 +1,5 @@
"""HA Cli docker object."""
import logging
from typing import List
from ..const import ENV_TIME
from ..coresys import CoreSysAttributes
@ -26,7 +25,7 @@ class DockerMulticast(DockerInterface, CoreSysAttributes):
return MULTICAST_DOCKER_NAME
@property
def capabilities(self) -> List[str]:
def capabilities(self) -> list[str]:
"""Generate needed capabilities."""
return [Capabilities.NET_ADMIN.value]

View File

@ -2,7 +2,7 @@
from contextlib import suppress
from ipaddress import IPv4Address
import logging
from typing import List, Optional
from typing import Optional
import docker
import requests
@ -30,9 +30,9 @@ class DockerNetwork:
return DOCKER_NETWORK
@property
def containers(self) -> List[docker.models.containers.Container]:
def containers(self) -> list[docker.models.containers.Container]:
"""Return of connected containers from network."""
containers: List[docker.models.containers.Container] = []
containers: list[docker.models.containers.Container] = []
for cid, _ in self.network.attrs.get("Containers", {}).items():
try:
containers.append(self.docker.containers.get(cid))
@ -99,7 +99,7 @@ class DockerNetwork:
def attach_container(
self,
container: docker.models.containers.Container,
alias: Optional[List[str]] = None,
alias: Optional[list[str]] = None,
ipv4: Optional[IPv4Address] = None,
) -> None:
"""Attach container to Supervisor network.

View File

@ -2,7 +2,6 @@
from __future__ import annotations
from pathlib import Path
from typing import Dict, List, Optional
import attr
import pyudev
@ -16,10 +15,10 @@ class Device:
path: Path = attr.ib(eq=False)
sysfs: Path = attr.ib(eq=True)
subsystem: str = attr.ib(eq=False)
parent: Optional[Path] = attr.ib(eq=False)
links: List[Path] = attr.ib(eq=False)
attributes: Dict[str, str] = attr.ib(eq=False)
children: List[Path] = attr.ib(eq=False)
parent: Path | None = attr.ib(eq=False)
links: list[Path] = attr.ib(eq=False)
attributes: dict[str, str] = attr.ib(eq=False)
children: list[Path] = attr.ib(eq=False)
@property
def major(self) -> int:
@ -32,7 +31,7 @@ class Device:
return int(self.attributes.get("MINOR", 0))
@property
def by_id(self) -> Optional[Path]:
def by_id(self) -> Path | None:
"""Return path by-id."""
for link in self.links:
if not link.match("/dev/*/by-id/*"):

View File

@ -1,7 +1,7 @@
"""Hardware Manager of Supervisor."""
import logging
from pathlib import Path
from typing import Dict, List, Optional
from typing import Optional
import pyudev
@ -24,7 +24,7 @@ class HardwareManager(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize Hardware Monitor object."""
self.coresys: CoreSys = coresys
self._devices: Dict[str, Device] = {}
self._devices: dict[str, Device] = {}
self._udev = pyudev.Context()
self._montior: HwMonitor = HwMonitor(coresys)
@ -53,7 +53,7 @@ class HardwareManager(CoreSysAttributes):
return self._disk
@property
def devices(self) -> List[Device]:
def devices(self) -> list[Device]:
"""Return List of devices."""
return list(self._devices.values())
@ -66,7 +66,7 @@ class HardwareManager(CoreSysAttributes):
return device
raise HardwareNotFound()
def filter_devices(self, subsystem: Optional[UdevSubsystem] = None) -> List[Device]:
def filter_devices(self, subsystem: Optional[UdevSubsystem] = None) -> list[Device]:
"""Return a filtered list."""
devices = set()
for device in self.devices:

View File

@ -1,6 +1,5 @@
"""Policy / cgroups management of local host."""
import logging
from typing import Dict, List
from ..coresys import CoreSys, CoreSysAttributes
from .const import PolicyGroup, UdevSubsystem
@ -12,7 +11,7 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
# fmt: off
# https://www.kernel.org/doc/Documentation/admin-guide/devices.txt
_CGROUPS: Dict[PolicyGroup, List[int]] = {
_CGROUPS: dict[PolicyGroup, list[int]] = {
PolicyGroup.UART: [
204, # ttyAMA / ttySAC (tty)
188, # ttyUSB (tty)
@ -37,7 +36,7 @@ _CGROUPS: Dict[PolicyGroup, List[int]] = {
]
}
_CGROUPS_DYNAMIC_MAJOR: Dict[PolicyGroup, List[UdevSubsystem]] = {
_CGROUPS_DYNAMIC_MAJOR: dict[PolicyGroup, list[UdevSubsystem]] = {
PolicyGroup.USB: [
UdevSubsystem.HIDRAW
],
@ -54,7 +53,7 @@ _CGROUPS_DYNAMIC_MAJOR: Dict[PolicyGroup, List[UdevSubsystem]] = {
]
}
_CGROUPS_DYNAMIC_MINOR: Dict[PolicyGroup, List[UdevSubsystem]] = {
_CGROUPS_DYNAMIC_MINOR: dict[PolicyGroup, list[UdevSubsystem]] = {
PolicyGroup.UART: [
UdevSubsystem.SERIAL
]
@ -74,9 +73,9 @@ class HwPolicy(CoreSysAttributes):
"""Return true if device is in cgroup Policy."""
return device.major in _CGROUPS.get(group, [])
def get_cgroups_rules(self, group: PolicyGroup) -> List[str]:
def get_cgroups_rules(self, group: PolicyGroup) -> list[str]:
"""Generate cgroups rules for a policy group."""
cgroups: List[str] = [f"c {dev}:* rwm" for dev in _CGROUPS[group]]
cgroups: list[str] = [f"c {dev}:* rwm" for dev in _CGROUPS[group]]
# Lookup dynamic device groups from host
if group in _CGROUPS_DYNAMIC_MAJOR:

View File

@ -3,7 +3,7 @@ import asyncio
from contextlib import asynccontextmanager, suppress
from datetime import datetime, timedelta
import logging
from typing import Any, AsyncContextManager, Dict, Optional
from typing import Any, AsyncContextManager, Optional
import aiohttp
from aiohttp import hdrs
@ -64,12 +64,12 @@ class HomeAssistantAPI(CoreSysAttributes):
self,
method: str,
path: str,
json: Optional[Dict[str, Any]] = None,
json: Optional[dict[str, Any]] = None,
content_type: Optional[str] = None,
data: Any = None,
timeout: int = 30,
params: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[dict[str, str]] = None,
headers: Optional[dict[str, str]] = None,
) -> AsyncContextManager[aiohttp.ClientResponse]:
"""Async context manager to make a request with right auth."""
url = f"{self.sys_homeassistant.api_url}/{path}"
@ -105,7 +105,7 @@ class HomeAssistantAPI(CoreSysAttributes):
raise HomeAssistantAPIError()
async def get_config(self) -> Dict[str, Any]:
async def get_config(self) -> dict[str, Any]:
"""Return Home Assistant config."""
async with self.make_request("get", "api/config") as resp:
if resp.status in (200, 201):

View File

@ -2,7 +2,7 @@
from datetime import timedelta
import logging
from pathlib import Path
from typing import Dict, Optional, Union
from typing import Optional, Union
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import YamlFileError
@ -19,7 +19,7 @@ class HomeAssistantSecrets(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize secret manager."""
self.coresys: CoreSys = coresys
self.secrets: Dict[str, Union[bool, float, int, str]] = {}
self.secrets: dict[str, Union[bool, float, int, str]] = {}
@property
def path_secrets(self) -> Path:

View File

@ -1,7 +1,7 @@
"""Home Assistant Websocket API."""
import asyncio
import logging
from typing import Any, Dict, Optional
from typing import Any, Optional
import aiohttp
from awesomeversion import AwesomeVersion
@ -30,7 +30,7 @@ class WSClient:
self.message_id: int = 0
self._lock: asyncio.Lock = asyncio.Lock()
async def async_send_command(self, message: Dict[str, Any]):
async def async_send_command(self, message: dict[str, Any]):
"""Send a websocket command."""
async with self._lock:
self.message_id += 1
@ -103,7 +103,7 @@ class HomeAssistantWebSocket(CoreSysAttributes):
return client
async def async_send_command(self, message: Dict[str, Any]):
async def async_send_command(self, message: dict[str, Any]):
"""Send a command with the WS client."""
if self.sys_core.state in CLOSING_STATES:
raise HomeAssistantWSNotSupported(
@ -136,7 +136,7 @@ class HomeAssistantWebSocket(CoreSysAttributes):
raise HomeAssistantWSError from err
async def async_supervisor_update_event(
self, key: str, data: Optional[Dict[str, Any]] = None
self, key: str, data: Optional[dict[str, Any]] = None
):
"""Send a supervisor/event command."""
try:
@ -155,13 +155,13 @@ class HomeAssistantWebSocket(CoreSysAttributes):
except HomeAssistantWSError as err:
_LOGGER.error(err)
def supervisor_update_event(self, key: str, data: Optional[Dict[str, Any]] = None):
def supervisor_update_event(self, key: str, data: Optional[dict[str, Any]] = None):
"""Send a supervisor/event command."""
if self.sys_core.state in CLOSING_STATES:
return
self.sys_create_task(self.async_supervisor_update_event(key, data))
def send_command(self, message: Dict[str, Any]):
def send_command(self, message: dict[str, Any]):
"""Send a supervisor/event command."""
if self.sys_core.state in CLOSING_STATES:
return

View File

@ -2,7 +2,6 @@
from contextlib import suppress
from functools import lru_cache
import logging
from typing import List
from ..const import BusEvent
from ..coresys import CoreSys, CoreSysAttributes
@ -65,12 +64,12 @@ class HostManager(CoreSysAttributes):
return self._sound
@property
def features(self) -> List[HostFeature]:
def features(self) -> list[HostFeature]:
"""Return a list of host features."""
return self.supported_features()
@lru_cache
def supported_features(self) -> List[HostFeature]:
def supported_features(self) -> list[HostFeature]:
"""Return a list of supported host features."""
features = []

View File

@ -4,7 +4,6 @@ from __future__ import annotations
import asyncio
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
import logging
from typing import List, Optional, Union
import attr
@ -40,10 +39,10 @@ class NetworkManager(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize system center handling."""
self.coresys: CoreSys = coresys
self._connectivity: Optional[bool] = None
self._connectivity: bool | None = None
@property
def connectivity(self) -> Optional[bool]:
def connectivity(self) -> bool | None:
"""Return true current connectivity state."""
return self._connectivity
@ -58,19 +57,19 @@ class NetworkManager(CoreSysAttributes):
)
@property
def interfaces(self) -> List[Interface]:
def interfaces(self) -> list[Interface]:
"""Return a dictionary of active interfaces."""
interfaces: List[Interface] = []
interfaces: list[Interface] = []
for inet in self.sys_dbus.network.interfaces.values():
interfaces.append(Interface.from_dbus_interface(inet))
return interfaces
@property
def dns_servers(self) -> List[str]:
def dns_servers(self) -> list[str]:
"""Return a list of local DNS servers."""
# Read all local dns servers
servers: List[str] = []
servers: list[str] = []
for config in self.sys_dbus.network.dns.configuration:
if config.vpn or not config.nameservers:
continue
@ -183,7 +182,7 @@ class NetworkManager(CoreSysAttributes):
)
await self.update()
async def scan_wifi(self, interface: Interface) -> List[AccessPoint]:
async def scan_wifi(self, interface: Interface) -> list[AccessPoint]:
"""Scan on Interface for AccessPoint."""
inet = self.sys_dbus.network.interfaces.get(interface.name)
@ -202,7 +201,7 @@ class NetworkManager(CoreSysAttributes):
await asyncio.sleep(5)
# Process AP
accesspoints: List[AccessPoint] = []
accesspoints: list[AccessPoint] = []
for ap_object in (await inet.wireless.get_all_accesspoints())[0]:
accesspoint = NetworkWirelessAP(ap_object)
@ -241,9 +240,9 @@ class IpConfig:
"""Represent a IP configuration."""
method: InterfaceMethod = attr.ib()
address: List[Union[IPv4Interface, IPv6Interface]] = attr.ib()
gateway: Optional[Union[IPv4Address, IPv6Address]] = attr.ib()
nameservers: List[Union[IPv4Address, IPv6Address]] = attr.ib()
address: list[IPv4Interface | IPv6Interface] = attr.ib()
gateway: IPv4Address | IPv6Address | None = attr.ib()
nameservers: list[IPv4Address | IPv6Address] = attr.ib()
@attr.s(slots=True)
@ -253,8 +252,8 @@ class WifiConfig:
mode: WifiMode = attr.ib()
ssid: str = attr.ib()
auth: AuthMethod = attr.ib()
psk: Optional[str] = attr.ib()
signal: Optional[int] = attr.ib()
psk: str | None = attr.ib()
signal: int | None = attr.ib()
@attr.s(slots=True)
@ -274,10 +273,10 @@ class Interface:
connected: bool = attr.ib()
primary: bool = attr.ib()
type: InterfaceType = attr.ib()
ipv4: Optional[IpConfig] = attr.ib()
ipv6: Optional[IpConfig] = attr.ib()
wifi: Optional[WifiConfig] = attr.ib()
vlan: Optional[VlanConfig] = attr.ib()
ipv4: IpConfig | None = attr.ib()
ipv6: IpConfig | None = attr.ib()
wifi: WifiConfig | None = attr.ib()
vlan: VlanConfig | None = attr.ib()
@staticmethod
def from_dbus_interface(inet: NetworkInterface) -> Interface:
@ -320,7 +319,7 @@ class Interface:
return mapping.get(method, InterfaceMethod.DISABLED)
@staticmethod
def _map_nm_connected(connection: Optional[NetworkConnection]) -> bool:
def _map_nm_connected(connection: NetworkConnection | None) -> bool:
"""Map connectivity state."""
if not connection:
return False
@ -340,7 +339,7 @@ class Interface:
return mapping[device_type]
@staticmethod
def _map_nm_wifi(inet: NetworkInterface) -> Optional[WifiConfig]:
def _map_nm_wifi(inet: NetworkInterface) -> WifiConfig | None:
"""Create mapping to nm wifi property."""
if inet.type != DeviceType.WIRELESS or not inet.settings:
return None
@ -376,7 +375,7 @@ class Interface:
)
@staticmethod
def _map_nm_vlan(inet: NetworkInterface) -> Optional[WifiConfig]:
def _map_nm_vlan(inet: NetworkInterface) -> WifiConfig | None:
"""Create mapping to nm vlan property."""
if inet.type != DeviceType.VLAN or not inet.settings:
return None

View File

@ -2,7 +2,7 @@
from datetime import timedelta
from enum import Enum
import logging
from typing import List, Optional
from typing import Optional
import attr
from pulsectl import Pulse, PulseError, PulseIndexError, PulseOperationFailed
@ -48,7 +48,7 @@ class AudioStream:
mute: bool = attr.ib()
default: bool = attr.ib()
card: Optional[int] = attr.ib()
applications: List[AudioApplication] = attr.ib()
applications: list[AudioApplication] = attr.ib()
@attr.s(frozen=True)
@ -67,7 +67,7 @@ class SoundCard:
name: str = attr.ib()
index: int = attr.ib()
driver: str = attr.ib()
profiles: List[SoundProfile] = attr.ib()
profiles: list[SoundProfile] = attr.ib()
class SoundControl(CoreSysAttributes):
@ -76,28 +76,28 @@ class SoundControl(CoreSysAttributes):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize PulseAudio sound control."""
self.coresys: CoreSys = coresys
self._cards: List[SoundCard] = []
self._inputs: List[AudioStream] = []
self._outputs: List[AudioStream] = []
self._applications: List[AudioApplication] = []
self._cards: list[SoundCard] = []
self._inputs: list[AudioStream] = []
self._outputs: list[AudioStream] = []
self._applications: list[AudioApplication] = []
@property
def cards(self) -> List[SoundCard]:
def cards(self) -> list[SoundCard]:
"""Return a list of available sound cards and profiles."""
return self._cards
@property
def inputs(self) -> List[AudioStream]:
def inputs(self) -> list[AudioStream]:
"""Return a list of available input streams."""
return self._inputs
@property
def outputs(self) -> List[AudioStream]:
def outputs(self) -> list[AudioStream]:
"""Return a list of available output streams."""
return self._outputs
@property
def applications(self) -> List[AudioApplication]:
def applications(self) -> list[AudioApplication]:
"""Return a list of available application streams."""
return self._applications
@ -311,7 +311,7 @@ class SoundControl(CoreSysAttributes):
# Update Sound Card
self._cards.clear()
for card in pulse.card_list():
sound_profiles: List[SoundProfile] = []
sound_profiles: list[SoundProfile] = []
# Generate profiles
for profile in card.profile_list:

View File

@ -3,7 +3,7 @@ from datetime import timedelta
import logging
import random
import secrets
from typing import Dict, List, Optional
from typing import Optional
from .addons.addon import Addon
from .const import ATTR_PORTS, ATTR_SESSION, FILE_HASSIO_INGRESS
@ -23,7 +23,7 @@ class Ingress(FileConfiguration, CoreSysAttributes):
"""Initialize updater."""
super().__init__(FILE_HASSIO_INGRESS, SCHEMA_INGRESS_CONFIG)
self.coresys: CoreSys = coresys
self.tokens: Dict[str, str] = {}
self.tokens: dict[str, str] = {}
def get(self, token: str) -> Optional[Addon]:
"""Return addon they have this ingress token."""
@ -32,17 +32,17 @@ class Ingress(FileConfiguration, CoreSysAttributes):
return self.sys_addons.get(self.tokens[token], local_only=True)
@property
def sessions(self) -> Dict[str, float]:
def sessions(self) -> dict[str, float]:
"""Return sessions."""
return self._data[ATTR_SESSION]
@property
def ports(self) -> Dict[str, int]:
def ports(self) -> dict[str, int]:
"""Return list of dynamic ports."""
return self._data[ATTR_PORTS]
@property
def addons(self) -> List[Addon]:
def addons(self) -> list[Addon]:
"""Return list of ingress Add-ons."""
addons = []
for addon in self.sys_addons.installed:

View File

@ -1,6 +1,6 @@
"""Supervisor job manager."""
import logging
from typing import Dict, List, Optional
from typing import Optional
from ..coresys import CoreSys, CoreSysAttributes
from ..utils.common import FileConfiguration
@ -56,20 +56,20 @@ class JobManager(FileConfiguration, CoreSysAttributes):
"""Initialize the JobManager class."""
super().__init__(FILE_CONFIG_JOBS, SCHEMA_JOBS_CONFIG)
self.coresys: CoreSys = coresys
self._jobs: Dict[str, SupervisorJob] = {}
self._jobs: dict[str, SupervisorJob] = {}
@property
def jobs(self) -> List[SupervisorJob]:
def jobs(self) -> list[SupervisorJob]:
"""Return a list of current jobs."""
return self._jobs
@property
def ignore_conditions(self) -> List[JobCondition]:
def ignore_conditions(self) -> list[JobCondition]:
"""Return a list of ingore condition."""
return self._data[ATTR_IGNORE_CONDITIONS]
@ignore_conditions.setter
def ignore_conditions(self, value: List[JobCondition]) -> None:
def ignore_conditions(self, value: list[JobCondition]) -> None:
"""Set a list of ignored condition."""
self._data[ATTR_IGNORE_CONDITIONS] = value

View File

@ -3,7 +3,7 @@ import asyncio
from datetime import datetime, timedelta
from functools import wraps
import logging
from typing import Any, List, Optional, Tuple
from typing import Any, Optional
import sentry_sdk
@ -23,7 +23,7 @@ class Job(CoreSysAttributes):
def __init__(
self,
name: Optional[str] = None,
conditions: Optional[List[JobCondition]] = None,
conditions: Optional[list[JobCondition]] = None,
cleanup: bool = True,
on_condition: Optional[JobException] = None,
limit: Optional[JobExecutionLimit] = None,
@ -47,7 +47,7 @@ class Job(CoreSysAttributes):
):
raise RuntimeError("Using Job without a Throttle period!")
def _post_init(self, args: Tuple[Any]) -> None:
def _post_init(self, args: tuple[Any]) -> None:
"""Runtime init."""
if self.name is None:
self.name = str(self._method.__qualname__).lower().replace(".", "_")

View File

@ -2,7 +2,7 @@
import asyncio
from datetime import date, datetime, time, timedelta
import logging
from typing import Awaitable, Callable, List, Optional, Union
from typing import Awaitable, Callable, Optional, Union
from uuid import UUID, uuid4
import async_timeout
@ -32,7 +32,7 @@ class Scheduler(CoreSysAttributes):
def __init__(self, coresys: CoreSys):
"""Initialize task schedule."""
self.coresys: CoreSys = coresys
self._tasks: List[_Task] = []
self._tasks: list[_Task] = []
def register_task(
self,
@ -96,7 +96,7 @@ class Scheduler(CoreSysAttributes):
async def shutdown(self, timeout=10) -> None:
"""Shutdown all task inside the scheduler."""
running: List[asyncio.tasks.Task] = []
running: list[asyncio.tasks.Task] = []
# Cancel next task / get running list
_LOGGER.info("Shutting down scheduled tasks")

View File

@ -1,7 +1,7 @@
"""Home Assistant Operating-System DataDisk."""
import logging
from pathlib import Path
from typing import List, Optional
from typing import Optional
from awesomeversion import AwesomeVersion
@ -34,9 +34,9 @@ class DataDisk(CoreSysAttributes):
return self.sys_dbus.agent.datadisk.current_device
@property
def available_disks(self) -> List[Path]:
def available_disks(self) -> list[Path]:
"""Return a list of possible new disk locations."""
device_paths: List[Path] = []
device_paths: list[Path] = []
for device in self.sys_hardware.devices:
# Filter devices out which can't be a target
if (

View File

@ -7,7 +7,7 @@ from contextlib import suppress
from ipaddress import IPv4Address
import logging
from pathlib import Path
from typing import List, Optional
from typing import Optional
import attr
from awesomeversion import AwesomeVersion
@ -43,7 +43,7 @@ class HostEntry:
"""Single entry in hosts."""
ip_address: IPv4Address = attr.ib()
names: List[str] = attr.ib()
names: list[str] = attr.ib()
class PluginDns(PluginBase):
@ -58,7 +58,7 @@ class PluginDns(PluginBase):
self.resolv_template: Optional[jinja2.Template] = None
self.hosts_template: Optional[jinja2.Template] = None
self._hosts: List[HostEntry] = []
self._hosts: list[HostEntry] = []
self._loop: bool = False
@property
@ -72,9 +72,9 @@ class PluginDns(PluginBase):
return Path(self.sys_config.path_dns, "coredns.json")
@property
def locals(self) -> List[str]:
def locals(self) -> list[str]:
"""Return list of local system DNS servers."""
servers: List[str] = []
servers: list[str] = []
for server in [
f"dns://{server!s}" for server in self.sys_host.network.dns_servers
]:
@ -84,12 +84,12 @@ class PluginDns(PluginBase):
return servers
@property
def servers(self) -> List[str]:
def servers(self) -> list[str]:
"""Return list of DNS servers."""
return self._data[ATTR_SERVERS]
@servers.setter
def servers(self, value: List[str]) -> None:
def servers(self, value: list[str]) -> None:
"""Return list of DNS servers."""
self._data[ATTR_SERVERS] = value
@ -259,8 +259,8 @@ class PluginDns(PluginBase):
def _write_config(self) -> None:
"""Write CoreDNS config."""
debug: bool = self.sys_config.logging == LogLevel.DEBUG
dns_servers: List[str] = []
dns_locals: List[str] = []
dns_servers: list[str] = []
dns_locals: list[str] = []
# Prepare DNS serverlist: Prio 1 Manual, Prio 2 Local, Prio 3 Fallback
if not self._loop:
@ -317,12 +317,12 @@ class PluginDns(PluginBase):
_LOGGER.error("Can't update hosts: %s", err)
raise CoreDNSError() from err
def add_host(self, ipv4: IPv4Address, names: List[str], write: bool = True) -> None:
def add_host(self, ipv4: IPv4Address, names: list[str], write: bool = True) -> None:
"""Add a new host entry."""
if not ipv4 or ipv4 == IPv4Address("0.0.0.0"):
return
hostnames: List[str] = []
hostnames: list[str] = []
for name in names:
hostnames.append(name)
hostnames.append(f"{name}.{DNS_SUFFIX}")
@ -355,7 +355,7 @@ class PluginDns(PluginBase):
if write:
self.write_hosts()
def _search_host(self, names: List[str]) -> Optional[HostEntry]:
def _search_host(self, names: list[str]) -> Optional[HostEntry]:
"""Search a host entry."""
for entry in self._hosts:
for name in names:

View File

@ -1,7 +1,7 @@
"""Helpers to checks the system."""
from importlib import import_module
import logging
from typing import Any, Dict, List
from typing import Any
from ..const import ATTR_CHECKS
from ..coresys import CoreSys, CoreSysAttributes
@ -18,17 +18,17 @@ class ResolutionCheck(CoreSysAttributes):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize the checks class."""
self.coresys = coresys
self._checks: Dict[str, CheckBase] = {}
self._checks: dict[str, CheckBase] = {}
self._load()
@property
def data(self) -> Dict[str, Any]:
def data(self) -> dict[str, Any]:
"""Return data."""
return self.sys_resolution.data[ATTR_CHECKS]
@property
def all_checks(self) -> List[CheckBase]:
def all_checks(self) -> list[CheckBase]:
"""Return all list of all checks."""
return list(self._checks.values())

View File

@ -1,7 +1,7 @@
"""Helpers to check core security."""
from datetime import timedelta
import logging
from typing import List, Optional
from typing import Optional
from ...const import AddonState, CoreState
from ...coresys import CoreSys
@ -99,6 +99,6 @@ class CheckAddonPwned(CheckBase):
return ContextType.ADDON
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING]

View File

@ -1,7 +1,7 @@
"""Baseclass for system checks."""
from abc import ABC, abstractmethod
import logging
from typing import List, Optional
from typing import Optional
from ...const import ATTR_ENABLED, CoreState
from ...coresys import CoreSys, CoreSysAttributes
@ -70,7 +70,7 @@ class CheckBase(ABC, CoreSysAttributes):
"""Return a ContextType enum."""
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return []

View File

@ -1,7 +1,7 @@
"""Helpers to check core security."""
from enum import Enum
from pathlib import Path
from typing import List, Optional
from typing import Optional
from awesomeversion import AwesomeVersion, AwesomeVersionException
@ -64,6 +64,6 @@ class CheckCoreSecurity(CheckBase):
return ContextType.CORE
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,6 +1,6 @@
"""Helpers to check core trust."""
import logging
from typing import List, Optional
from typing import Optional
from ...const import CoreState
from ...coresys import CoreSys
@ -54,6 +54,6 @@ class CheckCoreTrust(CheckBase):
return ContextType.CORE
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,5 +1,5 @@
"""Helpers to check and fix issues with free space."""
from typing import List, Optional
from typing import Optional
from ...backups.const import BackupType
from ...const import CoreState
@ -33,7 +33,7 @@ class CheckFreeSpace(CheckBase):
)
return
suggestions: List[SuggestionType] = []
suggestions: list[SuggestionType] = []
if (
len(
[
@ -67,6 +67,6 @@ class CheckFreeSpace(CheckBase):
return ContextType.SYSTEM
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,6 +1,6 @@
"""Helpers to check plugin trust."""
import logging
from typing import List, Optional
from typing import Optional
from ...const import CoreState
from ...coresys import CoreSys
@ -72,6 +72,6 @@ class CheckPluginTrust(CheckBase):
return ContextType.PLUGIN
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,6 +1,6 @@
"""Helpers to check supervisor trust."""
import logging
from typing import List, Optional
from typing import Optional
from ...const import CoreState
from ...coresys import CoreSys
@ -54,6 +54,6 @@ class CheckSupervisorTrust(CheckBase):
return ContextType.SUPERVISOR
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,7 +1,6 @@
"""Helpers to evaluate the system."""
from importlib import import_module
import logging
from typing import Dict, List, Set
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import ResolutionNotFound
@ -25,13 +24,13 @@ class ResolutionEvaluation(CoreSysAttributes):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize the evaluation class."""
self.coresys = coresys
self.cached_images: Set[str] = set()
self._evalutions: Dict[str, EvaluateBase] = {}
self.cached_images: set[str] = set()
self._evalutions: dict[str, EvaluateBase] = {}
self._load()
@property
def all_evaluations(self) -> List[EvaluateBase]:
def all_evaluations(self) -> list[EvaluateBase]:
"""Return all list of all checks."""
return list(self._evalutions.values())

View File

@ -1,6 +1,5 @@
"""Evaluation class for AppArmor."""
from pathlib import Path
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -29,7 +28,7 @@ class EvaluateAppArmor(EvaluateBase):
return "AppArmor is required for Home Assistant."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]

View File

@ -1,7 +1,6 @@
"""Baseclass for system evaluations."""
from abc import ABC, abstractmethod
import logging
from typing import List
from ...const import CoreState
from ...coresys import CoreSys, CoreSysAttributes
@ -54,6 +53,6 @@ class EvaluateBase(ABC, CoreSysAttributes):
"""Return a string that is printed when self.evaluate is False."""
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return []

View File

@ -1,6 +1,6 @@
"""Evaluation class for container."""
import logging
from typing import Any, List
from typing import Any
from docker.errors import DockerException
from requests import RequestException
@ -43,7 +43,7 @@ class EvaluateContainer(EvaluateBase):
return f"Found images: {self._images} which are not supported, remove these from the host!"
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP, CoreState.RUNNING, CoreState.INITIALIZE]
@ -68,7 +68,7 @@ class EvaluateContainer(EvaluateBase):
self._images.add(image_name)
return len(self._images) != 0
def _get_images(self) -> List[Any]:
def _get_images(self) -> list[Any]:
"""Return a list of images."""
images = []

View File

@ -1,5 +1,4 @@
"""Evaluation class for Content Trust."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -26,7 +25,7 @@ class EvaluateContentTrust(EvaluateBase):
return "System run with disabled trusted content security."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]

View File

@ -1,5 +1,4 @@
"""Evaluation class for dbus."""
from typing import List
from ...const import SOCKET_DBUS, CoreState
from ...coresys import CoreSys
@ -26,7 +25,7 @@ class EvaluateDbus(EvaluateBase):
return "D-Bus is required for Home Assistant."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]

View File

@ -1,6 +1,5 @@
"""Evaluation class for docker configuration."""
import logging
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -32,7 +31,7 @@ class EvaluateDockerConfiguration(EvaluateBase):
return "The configuration of Docker is not supported"
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]

View File

@ -1,5 +1,4 @@
"""Evaluation class for docker version."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -26,7 +25,7 @@ class EvaluateDockerVersion(EvaluateBase):
return f"Docker version '{self.sys_docker.info.version}' is not supported by the Supervisor!"
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]

View File

@ -1,5 +1,4 @@
"""Evaluation class for Job Conditions."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -26,7 +25,7 @@ class EvaluateJobConditions(EvaluateBase):
return "Found unsupported job conditions settings."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]

View File

@ -1,7 +1,6 @@
"""Evaluation class for lxc."""
from contextlib import suppress
from pathlib import Path
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -28,7 +27,7 @@ class EvaluateLxc(EvaluateBase):
return "Detected Docker running inside LXC."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]

View File

@ -1,5 +1,4 @@
"""Evaluation class for network manager."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -27,7 +26,7 @@ class EvaluateNetworkManager(EvaluateBase):
return "NetworkManager is not correctly configured"
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP, CoreState.RUNNING]

View File

@ -1,5 +1,4 @@
"""Evaluation class for operating system."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -28,7 +27,7 @@ class EvaluateOperatingSystem(EvaluateBase):
return f"Detected unsupported OS: {self.sys_host.info.operating_system}"
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP]

View File

@ -1,5 +1,4 @@
"""Evaluation class for privileged."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -26,7 +25,7 @@ class EvaluatePrivileged(EvaluateBase):
return "Supervisor does not run in Privileged mode."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]

View File

@ -1,7 +1,6 @@
"""Evaluation class for Content Trust."""
import logging
from pathlib import Path
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -32,7 +31,7 @@ class EvaluateSourceMods(EvaluateBase):
return "System detect unauthorized source code modifications."
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.RUNNING]

View File

@ -1,5 +1,4 @@
"""Evaluation class for systemd."""
from typing import List
from ...const import CoreState
from ...coresys import CoreSys
@ -27,7 +26,7 @@ class EvaluateSystemd(EvaluateBase):
return "Systemd is not correctly working"
@property
def states(self) -> List[CoreState]:
def states(self) -> list[CoreState]:
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP]

View File

@ -1,6 +1,5 @@
"""Helpers to fixup the system."""
import logging
from typing import List
from ..coresys import CoreSys, CoreSysAttributes
from ..jobs.const import JobCondition
@ -30,7 +29,7 @@ class ResolutionFixup(CoreSysAttributes):
self._store_execute_remove = FixupStoreExecuteRemove(coresys)
@property
def all_fixes(self) -> List[FixupBase]:
def all_fixes(self) -> list[FixupBase]:
"""Return a list of all fixups.
Order can be important!

View File

@ -1,7 +1,7 @@
"""Baseclass for system fixup."""
from abc import ABC, abstractmethod
import logging
from typing import List, Optional
from typing import Optional
from ...coresys import CoreSys, CoreSysAttributes
from ...exceptions import ResolutionFixupError
@ -63,7 +63,7 @@ class FixupBase(ABC, CoreSysAttributes):
"""Return a ContextType enum."""
@property
def issues(self) -> List[IssueType]:
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return []

View File

@ -1,6 +1,6 @@
"""Helpers to check and fix issues with free space."""
import logging
from typing import List, Optional
from typing import Optional
from ...backups.const import BackupType
from ..const import MINIMUM_FULL_BACKUPS, ContextType, IssueType, SuggestionType
@ -36,6 +36,6 @@ class FixupClearFullBackup(FixupBase):
return ContextType.SYSTEM
@property
def issues(self) -> List[IssueType]:
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.FREE_SPACE]

View File

@ -1,6 +1,6 @@
"""Helpers to check and fix issues with free space."""
import logging
from typing import List, Optional
from typing import Optional
from ...exceptions import (
ResolutionFixupError,
@ -50,7 +50,7 @@ class FixupStoreExecuteReload(FixupBase):
return ContextType.STORE
@property
def issues(self) -> List[IssueType]:
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.FATAL_ERROR]

View File

@ -1,6 +1,6 @@
"""Helpers to check and fix issues with free space."""
import logging
from typing import List, Optional
from typing import Optional
from supervisor.exceptions import ResolutionFixupError, StoreError, StoreNotFound
@ -44,7 +44,7 @@ class FixupStoreExecuteRemove(FixupBase):
return ContextType.STORE
@property
def issues(self) -> List[IssueType]:
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.CORRUPT_REPOSITORY]

View File

@ -1,6 +1,6 @@
"""Helpers to check and fix issues with free space."""
import logging
from typing import List, Optional
from typing import Optional
from ...exceptions import (
ResolutionFixupError,
@ -52,7 +52,7 @@ class FixupStoreExecuteReset(FixupBase):
return ContextType.STORE
@property
def issues(self) -> List[IssueType]:
def issues(self) -> list[IssueType]:
"""Return a IssueType enum list."""
return [IssueType.CORRUPT_REPOSITORY, IssueType.FATAL_ERROR]

Some files were not shown because too many files have changed in this diff Show More