Fix mypy issues in store, utils and all other source files (#5957)

* Fix mypy issues in store module

* Fix mypy issues in utils module

* Fix mypy issues in all remaining source files

* Fix ingress user typeddict

* Fixes from feedback

* Fix mypy issues after installing docker-types
This commit is contained in:
Mike Degatano 2025-06-18 12:40:12 -04:00 committed by GitHub
parent 5c04249e41
commit aea15b65b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 275 additions and 210 deletions

View File

@ -40,7 +40,7 @@ class CpuArch(CoreSysAttributes):
@property
def supervisor(self) -> str:
"""Return supervisor arch."""
return self.sys_supervisor.arch
return self.sys_supervisor.arch or self._default_arch
@property
def supported(self) -> list[str]:
@ -91,4 +91,14 @@ class CpuArch(CoreSysAttributes):
for check, value in MAP_CPU.items():
if cpu.startswith(check):
return value
return self.sys_supervisor.arch
if self.sys_supervisor.arch:
_LOGGER.warning(
"Unknown CPU architecture %s, falling back to Supervisor architecture.",
cpu,
)
return self.sys_supervisor.arch
_LOGGER.warning(
"Unknown CPU architecture %s, assuming CPU architecture equals Supervisor architecture.",
cpu,
)
return cpu

View File

@ -3,10 +3,10 @@
import asyncio
import hashlib
import logging
from typing import Any
from typing import Any, TypedDict, cast
from .addons.addon import Addon
from .const import ATTR_ADDON, ATTR_PASSWORD, ATTR_TYPE, ATTR_USERNAME, FILE_HASSIO_AUTH
from .const import ATTR_PASSWORD, ATTR_TYPE, ATTR_USERNAME, FILE_HASSIO_AUTH
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
AuthError,
@ -21,6 +21,17 @@ from .validate import SCHEMA_AUTH_CONFIG
_LOGGER: logging.Logger = logging.getLogger(__name__)
class BackendAuthRequest(TypedDict):
"""Model for a backend auth request.
https://github.com/home-assistant/core/blob/ed9503324d9d255e6fb077f1614fb6d55800f389/homeassistant/components/hassio/auth.py#L66-L73
"""
username: str
password: str
addon: str
class Auth(FileConfiguration, CoreSysAttributes):
"""Manage SSO for Add-ons with Home Assistant user."""
@ -74,6 +85,9 @@ class Auth(FileConfiguration, CoreSysAttributes):
"""Check username login."""
if password is None:
raise AuthError("None as password is not supported!", _LOGGER.error)
if username is None:
raise AuthError("None as username is not supported!", _LOGGER.error)
_LOGGER.info("Auth request from '%s' for '%s'", addon.slug, username)
# Get from cache
@ -103,11 +117,12 @@ class Auth(FileConfiguration, CoreSysAttributes):
async with self.sys_homeassistant.api.make_request(
"post",
"api/hassio_auth",
json={
ATTR_USERNAME: username,
ATTR_PASSWORD: password,
ATTR_ADDON: addon.slug,
},
json=cast(
dict[str, Any],
BackendAuthRequest(
username=username, password=password, addon=addon.slug
),
),
) as req:
if req.status == 200:
_LOGGER.info("Successful login for '%s'", username)

View File

@ -285,7 +285,7 @@ def check_environment() -> None:
_LOGGER.critical("Can't find Docker socket!")
def register_signal_handlers(loop: asyncio.BaseEventLoop, coresys: CoreSys) -> None:
def register_signal_handlers(loop: asyncio.AbstractEventLoop, coresys: CoreSys) -> None:
"""Register SIGTERM, SIGHUP and SIGKILL to stop the Supervisor."""
try:
loop.add_signal_handler(

View File

@ -2,7 +2,7 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable
from collections.abc import Callable, Coroutine
import logging
from typing import Any
@ -19,7 +19,7 @@ class EventListener:
"""Event listener."""
event_type: BusEvent = attr.ib()
callback: Callable[[Any], Awaitable[None]] = attr.ib()
callback: Callable[[Any], Coroutine[Any, Any, None]] = attr.ib()
class Bus(CoreSysAttributes):
@ -31,7 +31,7 @@ class Bus(CoreSysAttributes):
self._listeners: dict[BusEvent, list[EventListener]] = {}
def register_event(
self, event: BusEvent, callback: Callable[[Any], Awaitable[None]]
self, event: BusEvent, callback: Callable[[Any], Coroutine[Any, Any, None]]
) -> EventListener:
"""Register callback for an event."""
listener = EventListener(event, callback)

View File

@ -66,7 +66,7 @@ _UTC = "UTC"
class CoreConfig(FileConfiguration):
"""Hold all core config data."""
def __init__(self):
def __init__(self) -> None:
"""Initialize config object."""
super().__init__(FILE_HASSIO_CONFIG, SCHEMA_SUPERVISOR_CONFIG)
self._timezone_tzinfo: tzinfo | None = None

View File

@ -5,7 +5,7 @@ from enum import StrEnum
from ipaddress import IPv4Network, IPv6Network
from pathlib import Path
from sys import version_info as systemversion
from typing import Self
from typing import NotRequired, Self, TypedDict
from aiohttp import __version__ as aiohttpversion
@ -415,10 +415,12 @@ class AddonBoot(StrEnum):
MANUAL = "manual"
@classmethod
def _missing_(cls, value: str) -> Self | None:
def _missing_(cls, value: object) -> Self | None:
"""Convert 'forced' config values to their counterpart."""
if value == AddonBootConfig.MANUAL_ONLY:
return AddonBoot.MANUAL
for member in cls:
if member == AddonBoot.MANUAL:
return member
return None
@ -515,6 +517,16 @@ class CpuArch(StrEnum):
AMD64 = "amd64"
class IngressSessionDataUserDict(TypedDict):
"""Response object for ingress session user."""
id: str
username: NotRequired[str | None]
# Name is an alias for displayname, only one should be used
displayname: NotRequired[str | None]
name: NotRequired[str | None]
@dataclass
class IngressSessionDataUser:
"""Format of an IngressSessionDataUser object."""
@ -523,38 +535,42 @@ class IngressSessionDataUser:
display_name: str | None = None
username: str | None = None
def to_dict(self) -> dict[str, str | None]:
def to_dict(self) -> IngressSessionDataUserDict:
"""Get dictionary representation."""
return {
ATTR_ID: self.id,
ATTR_DISPLAYNAME: self.display_name,
ATTR_USERNAME: self.username,
}
return IngressSessionDataUserDict(
id=self.id, displayname=self.display_name, username=self.username
)
@classmethod
def from_dict(cls, data: dict[str, str | None]) -> Self:
def from_dict(cls, data: IngressSessionDataUserDict) -> Self:
"""Return object from dictionary representation."""
return cls(
id=data[ATTR_ID],
display_name=data.get(ATTR_DISPLAYNAME),
username=data.get(ATTR_USERNAME),
id=data["id"],
display_name=data.get("displayname") or data.get("name"),
username=data.get("username"),
)
class IngressSessionDataDict(TypedDict):
"""Response object for ingress session data."""
user: IngressSessionDataUserDict
@dataclass
class IngressSessionData:
"""Format of an IngressSessionData object."""
user: IngressSessionDataUser
def to_dict(self) -> dict[str, dict[str, str | None]]:
def to_dict(self) -> IngressSessionDataDict:
"""Get dictionary representation."""
return {ATTR_USER: self.user.to_dict()}
return IngressSessionDataDict(user=self.user.to_dict())
@classmethod
def from_dict(cls, data: dict[str, dict[str, str | None]]) -> Self:
def from_dict(cls, data: IngressSessionDataDict) -> Self:
"""Return object from dictionary representation."""
return cls(user=IngressSessionDataUser.from_dict(data[ATTR_USER]))
return cls(user=IngressSessionDataUser.from_dict(data["user"]))
STARTING_STATES = [

View File

@ -28,7 +28,7 @@ from .homeassistant.core import LANDINGPAGE
from .resolution.const import ContextType, IssueType, SuggestionType, UnhealthyReason
from .utils.dt import utcnow
from .utils.sentry import async_capture_exception
from .utils.whoami import WhoamiData, retrieve_whoami
from .utils.whoami import retrieve_whoami
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -36,7 +36,7 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
class Core(CoreSysAttributes):
"""Main object of Supervisor."""
def __init__(self, coresys: CoreSys):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize Supervisor object."""
self.coresys: CoreSys = coresys
self._state: CoreState = CoreState.INITIALIZE
@ -91,7 +91,7 @@ class Core(CoreSysAttributes):
"info", {"state": self._state}
)
async def connect(self):
async def connect(self) -> None:
"""Connect Supervisor container."""
# Load information from container
await self.sys_supervisor.load()
@ -120,7 +120,7 @@ class Core(CoreSysAttributes):
self.sys_config.version = self.sys_supervisor.version
await self.sys_config.save_data()
async def setup(self):
async def setup(self) -> None:
"""Start setting up supervisor orchestration."""
await self.set_state(CoreState.SETUP)
@ -216,7 +216,7 @@ class Core(CoreSysAttributes):
# Evaluate the system
await self.sys_resolution.evaluate.evaluate_system()
async def start(self):
async def start(self) -> None:
"""Start Supervisor orchestration."""
await self.set_state(CoreState.STARTUP)
@ -310,7 +310,7 @@ class Core(CoreSysAttributes):
)
_LOGGER.info("Supervisor is up and running")
async def stop(self):
async def stop(self) -> None:
"""Stop a running orchestration."""
# store new last boot / prevent time adjustments
if self.state in (CoreState.RUNNING, CoreState.SHUTDOWN):
@ -358,7 +358,7 @@ class Core(CoreSysAttributes):
_LOGGER.info("Supervisor is down - %d", self.exit_code)
self.sys_loop.stop()
async def shutdown(self, *, remove_homeassistant_container: bool = False):
async def shutdown(self, *, remove_homeassistant_container: bool = False) -> None:
"""Shutdown all running containers in correct order."""
# don't process scheduler anymore
if self.state == CoreState.RUNNING:
@ -382,19 +382,15 @@ class Core(CoreSysAttributes):
if self.state in (CoreState.STOPPING, CoreState.SHUTDOWN):
await self.sys_plugins.shutdown()
async def _update_last_boot(self):
async def _update_last_boot(self) -> None:
"""Update last boot time."""
self.sys_config.last_boot = await self.sys_hardware.helper.last_boot()
if not (last_boot := await self.sys_hardware.helper.last_boot()):
_LOGGER.error("Could not update last boot information!")
return
self.sys_config.last_boot = last_boot
await self.sys_config.save_data()
async def _retrieve_whoami(self, with_ssl: bool) -> WhoamiData | None:
try:
return await retrieve_whoami(self.sys_websession, with_ssl)
except WhoamiSSLError:
_LOGGER.info("Whoami service SSL error")
return None
async def _adjust_system_datetime(self):
async def _adjust_system_datetime(self) -> None:
"""Adjust system time/date on startup."""
# If no timezone is detect or set
# If we are not connected or time sync
@ -406,11 +402,13 @@ class Core(CoreSysAttributes):
# Get Timezone data
try:
data = await self._retrieve_whoami(True)
try:
data = await retrieve_whoami(self.sys_websession, True)
except WhoamiSSLError:
# SSL Date Issue & possible time drift
_LOGGER.info("Whoami service SSL error")
data = await retrieve_whoami(self.sys_websession, False)
# SSL Date Issue & possible time drift
if not data:
data = await self._retrieve_whoami(False)
except WhoamiError as err:
_LOGGER.warning("Can't adjust Time/Date settings: %s", err)
return
@ -426,7 +424,7 @@ class Core(CoreSysAttributes):
await self.sys_host.control.set_datetime(data.dt_utc)
await self.sys_supervisor.check_connectivity()
async def repair(self):
async def repair(self) -> None:
"""Repair system integrity."""
_LOGGER.info("Starting repair of Supervisor Environment")
await self.sys_run_in_executor(self.sys_docker.repair)

View File

@ -62,17 +62,17 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
class CoreSys:
"""Class that handle all shared data."""
def __init__(self):
def __init__(self) -> None:
"""Initialize coresys."""
# Static attributes protected
self._machine_id: str | None = None
self._machine: str | None = None
# External objects
self._loop: asyncio.BaseEventLoop = asyncio.get_running_loop()
self._loop = asyncio.get_running_loop()
# Global objects
self._config: CoreConfig = CoreConfig()
self._config = CoreConfig()
# Internal objects pointers
self._docker: DockerAPI | None = None
@ -122,6 +122,7 @@ class CoreSys:
if self._websession:
await self._websession.close()
resolver: aiohttp.abc.AbstractResolver
try:
resolver = aiohttp.AsyncResolver(loop=self.loop)
# pylint: disable=protected-access
@ -144,7 +145,7 @@ class CoreSys:
self._websession = session
async def init_machine(self):
async def init_machine(self) -> None:
"""Initialize machine information."""
def _load_machine_id() -> str | None:
@ -188,7 +189,7 @@ class CoreSys:
return UTC
@property
def loop(self) -> asyncio.BaseEventLoop:
def loop(self) -> asyncio.AbstractEventLoop:
"""Return loop object."""
return self._loop
@ -608,8 +609,8 @@ class CoreSys:
self._set_task_context.append(callback)
def run_in_executor(
self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any]
) -> Coroutine[Any, Any, T]:
self, funct: Callable[..., T], *args, **kwargs
) -> asyncio.Future[T]:
"""Add an job to the executor pool."""
if kwargs:
funct = partial(funct, **kwargs)
@ -631,8 +632,8 @@ class CoreSys:
self,
delay: float,
funct: Callable[..., Any],
*args: tuple[Any],
**kwargs: dict[str, Any],
*args,
**kwargs,
) -> asyncio.TimerHandle:
"""Start a task after a delay."""
if kwargs:
@ -644,8 +645,8 @@ class CoreSys:
self,
when: datetime,
funct: Callable[..., Any],
*args: tuple[Any],
**kwargs: dict[str, Any],
*args,
**kwargs,
) -> asyncio.TimerHandle:
"""Start a task at the specified datetime."""
if kwargs:
@ -682,7 +683,7 @@ class CoreSysAttributes:
return self.coresys.dev
@property
def sys_loop(self) -> asyncio.BaseEventLoop:
def sys_loop(self) -> asyncio.AbstractEventLoop:
"""Return loop object."""
return self.coresys.loop
@ -832,7 +833,7 @@ class CoreSysAttributes:
def sys_run_in_executor(
self, funct: Callable[..., T], *args, **kwargs
) -> Coroutine[Any, Any, T]:
) -> asyncio.Future[T]:
"""Add a job to the executor pool."""
return self.coresys.run_in_executor(funct, *args, **kwargs)

View File

@ -117,7 +117,7 @@ class DBusInterfaceProxy(DBusInterface, ABC):
"""Initialize object with already connected dbus object."""
await super().initialize(connected_dbus)
if not self.connected_dbus.properties:
if not self.connected_dbus.supports_properties:
self.disconnect()
raise DBusInterfaceError(
f"D-Bus object {self.object_path} is not usable, introspection is missing required properties interface"

View File

@ -344,7 +344,7 @@ class DockerAddon(DockerInterface):
mounts = [
MOUNT_DEV,
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.addon.path_extern_data.as_posix(),
target=target_data_path or PATH_PRIVATE_DATA.as_posix(),
read_only=False,
@ -355,7 +355,7 @@ class DockerAddon(DockerInterface):
if MappingType.CONFIG in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_homeassistant.as_posix(),
target=addon_mapping[MappingType.CONFIG].path
or PATH_HOMEASSISTANT_CONFIG_LEGACY.as_posix(),
@ -368,7 +368,7 @@ class DockerAddon(DockerInterface):
if self.addon.addon_config_used:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.addon.path_extern_config.as_posix(),
target=addon_mapping[MappingType.ADDON_CONFIG].path
or PATH_PUBLIC_CONFIG.as_posix(),
@ -380,7 +380,7 @@ class DockerAddon(DockerInterface):
if MappingType.HOMEASSISTANT_CONFIG in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_homeassistant.as_posix(),
target=addon_mapping[MappingType.HOMEASSISTANT_CONFIG].path
or PATH_HOMEASSISTANT_CONFIG.as_posix(),
@ -393,7 +393,7 @@ class DockerAddon(DockerInterface):
if MappingType.ALL_ADDON_CONFIGS in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_addon_configs.as_posix(),
target=addon_mapping[MappingType.ALL_ADDON_CONFIGS].path
or PATH_ALL_ADDON_CONFIGS.as_posix(),
@ -404,7 +404,7 @@ class DockerAddon(DockerInterface):
if MappingType.SSL in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_ssl.as_posix(),
target=addon_mapping[MappingType.SSL].path or PATH_SSL.as_posix(),
read_only=addon_mapping[MappingType.SSL].read_only,
@ -414,7 +414,7 @@ class DockerAddon(DockerInterface):
if MappingType.ADDONS in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_addons_local.as_posix(),
target=addon_mapping[MappingType.ADDONS].path
or PATH_LOCAL_ADDONS.as_posix(),
@ -425,7 +425,7 @@ class DockerAddon(DockerInterface):
if MappingType.BACKUP in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_backup.as_posix(),
target=addon_mapping[MappingType.BACKUP].path
or PATH_BACKUP.as_posix(),
@ -436,7 +436,7 @@ class DockerAddon(DockerInterface):
if MappingType.SHARE in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_share.as_posix(),
target=addon_mapping[MappingType.SHARE].path
or PATH_SHARE.as_posix(),
@ -448,7 +448,7 @@ class DockerAddon(DockerInterface):
if MappingType.MEDIA in addon_mapping:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_media.as_posix(),
target=addon_mapping[MappingType.MEDIA].path
or PATH_MEDIA.as_posix(),
@ -466,7 +466,7 @@ class DockerAddon(DockerInterface):
continue
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=gpio_path,
target=gpio_path,
read_only=False,
@ -477,7 +477,7 @@ class DockerAddon(DockerInterface):
if self.addon.with_devicetree:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source="/sys/firmware/devicetree/base",
target="/device-tree",
read_only=True,
@ -492,7 +492,7 @@ class DockerAddon(DockerInterface):
if self.addon.with_kernel_modules:
mounts.append(
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source="/lib/modules",
target="/lib/modules",
read_only=True,
@ -511,19 +511,19 @@ class DockerAddon(DockerInterface):
if self.addon.with_audio:
mounts += [
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.addon.path_extern_pulse.as_posix(),
target="/etc/pulse/client.conf",
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_plugins.audio.path_extern_pulse.as_posix(),
target="/run/audio",
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_plugins.audio.path_extern_asound.as_posix(),
target="/etc/asound.conf",
read_only=True,
@ -534,13 +534,13 @@ class DockerAddon(DockerInterface):
if self.addon.with_journald:
mounts += [
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=SYSTEMD_JOURNAL_PERSISTENT.as_posix(),
target=SYSTEMD_JOURNAL_PERSISTENT.as_posix(),
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=SYSTEMD_JOURNAL_VOLATILE.as_posix(),
target=SYSTEMD_JOURNAL_VOLATILE.as_posix(),
read_only=True,

View File

@ -47,7 +47,7 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
mounts = [
MOUNT_DEV,
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_audio.as_posix(),
target=PATH_PRIVATE_DATA.as_posix(),
read_only=False,

View File

@ -74,24 +74,26 @@ ENV_TOKEN_OLD = "HASSIO_TOKEN"
LABEL_MANAGED = "supervisor_managed"
MOUNT_DBUS = Mount(
type=MountType.BIND, source="/run/dbus", target="/run/dbus", read_only=True
type=MountType.BIND.value, source="/run/dbus", target="/run/dbus", read_only=True
)
MOUNT_DEV = Mount(
type=MountType.BIND.value, source="/dev", target="/dev", read_only=True
)
MOUNT_DEV = Mount(type=MountType.BIND, source="/dev", target="/dev", read_only=True)
MOUNT_DEV.setdefault("BindOptions", {})["ReadOnlyNonRecursive"] = True
MOUNT_DOCKER = Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source="/run/docker.sock",
target="/run/docker.sock",
read_only=True,
)
MOUNT_MACHINE_ID = Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=MACHINE_ID.as_posix(),
target=MACHINE_ID.as_posix(),
read_only=True,
)
MOUNT_UDEV = Mount(
type=MountType.BIND, source="/run/udev", target="/run/udev", read_only=True
type=MountType.BIND.value, source="/run/udev", target="/run/udev", read_only=True
)
PATH_PRIVATE_DATA = PurePath("/data")

View File

@ -48,7 +48,7 @@ class DockerDNS(DockerInterface, CoreSysAttributes):
environment={ENV_TIME: self.sys_timezone},
mounts=[
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_dns.as_posix(),
target="/config",
read_only=False,

View File

@ -99,7 +99,7 @@ class DockerHomeAssistant(DockerInterface):
MOUNT_UDEV,
# HA config folder
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_homeassistant.as_posix(),
target=PATH_PUBLIC_CONFIG.as_posix(),
read_only=False,
@ -112,20 +112,20 @@ class DockerHomeAssistant(DockerInterface):
[
# All other folders
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_ssl.as_posix(),
target=PATH_SSL.as_posix(),
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_share.as_posix(),
target=PATH_SHARE.as_posix(),
read_only=False,
propagation=PropagationMode.RSLAVE.value,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_media.as_posix(),
target=PATH_MEDIA.as_posix(),
read_only=False,
@ -133,19 +133,19 @@ class DockerHomeAssistant(DockerInterface):
),
# Configuration audio
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_homeassistant.path_extern_pulse.as_posix(),
target="/etc/pulse/client.conf",
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_plugins.audio.path_extern_pulse.as_posix(),
target="/run/audio",
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_plugins.audio.path_extern_asound.as_posix(),
target="/etc/asound.conf",
read_only=True,
@ -218,19 +218,19 @@ class DockerHomeAssistant(DockerInterface):
stderr=True,
mounts=[
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_homeassistant.as_posix(),
target="/config",
read_only=False,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_ssl.as_posix(),
target="/ssl",
read_only=True,
),
Mount(
type=MountType.BIND,
type=MountType.BIND.value,
source=self.sys_config.path_extern_share.as_posix(),
target="/share",
read_only=False,

View File

@ -57,7 +57,7 @@ class DockerNetwork:
def __init__(self, docker_client: docker.DockerClient):
"""Initialize internal Supervisor network."""
self.docker: docker.DockerClient = docker_client
self._network: docker.models.networks.Network | None = None
self._network: docker.models.networks.Network
async def post_init(self, enable_ipv6: bool = False) -> Self:
"""Post init actions that must be done in event loop."""
@ -154,7 +154,7 @@ class DockerNetwork:
network_params[ATTR_ENABLE_IPV6] = enable_ipv6
try:
self._network = self.docker.networks.create(**network_params)
self._network = self.docker.networks.create(**network_params) # type: ignore
except docker.errors.APIError as err:
raise DockerError(
f"Can't create Supervisor network: {err}", _LOGGER.error

View File

@ -35,6 +35,7 @@ from ..const import (
FILE_HASSIO_HOMEASSISTANT,
BusEvent,
IngressSessionDataUser,
IngressSessionDataUserDict,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
@ -557,18 +558,11 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
async def get_users(self) -> list[IngressSessionDataUser]:
"""Get list of all configured users."""
list_of_users: (
list[dict[str, Any]] | None
list[IngressSessionDataUserDict] | None
) = await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: "config/auth/list"}
)
if list_of_users:
return [
IngressSessionDataUser(
id=data["id"],
username=data.get("username"),
display_name=data.get("name"),
)
for data in list_of_users
]
return [IngressSessionDataUser.from_dict(data) for data in list_of_users]
return []

View File

@ -138,8 +138,8 @@ class NetworkManager(CoreSysAttributes):
]
)
self.sys_dbus.network.dbus.properties.on_properties_changed(
self._check_connectivity_changed
self.sys_dbus.network.dbus.properties.on(
"properties_changed", self._check_connectivity_changed
)
async def _check_connectivity_changed(

View File

@ -12,6 +12,7 @@ from .const import (
ATTR_SESSION_DATA,
FILE_HASSIO_INGRESS,
IngressSessionData,
IngressSessionDataDict,
)
from .coresys import CoreSys, CoreSysAttributes
from .utils import check_port
@ -49,7 +50,7 @@ class Ingress(FileConfiguration, CoreSysAttributes):
return self._data[ATTR_SESSION]
@property
def sessions_data(self) -> dict[str, dict[str, str | None]]:
def sessions_data(self) -> dict[str, IngressSessionDataDict]:
"""Return sessions_data."""
return self._data[ATTR_SESSION_DATA]
@ -89,7 +90,7 @@ class Ingress(FileConfiguration, CoreSysAttributes):
now = utcnow()
sessions = {}
sessions_data: dict[str, dict[str, str | None]] = {}
sessions_data: dict[str, IngressSessionDataDict] = {}
for session, valid in self.sessions.items():
# check if timestamp valid, to avoid crash on malformed timestamp
try:
@ -118,7 +119,8 @@ class Ingress(FileConfiguration, CoreSysAttributes):
# Read all ingress token and build a map
for addon in self.addons:
self.tokens[addon.ingress_token] = addon.slug
if addon.ingress_token:
self.tokens[addon.ingress_token] = addon.slug
def create_session(self, data: IngressSessionData | None = None) -> str:
"""Create new session."""
@ -141,7 +143,7 @@ class Ingress(FileConfiguration, CoreSysAttributes):
try:
valid_until = utc_from_timestamp(self.sessions[session])
except OverflowError:
self.sessions[session] = utcnow() + timedelta(minutes=15)
self.sessions[session] = (utcnow() + timedelta(minutes=15)).timestamp()
return True
# Is still valid?

View File

@ -60,7 +60,7 @@ class EvaluateContainer(EvaluateBase):
"""Return a set of all known images."""
return {
self.sys_homeassistant.image,
self.sys_supervisor.image,
self.sys_supervisor.image or self.sys_supervisor.default_image,
*(plugin.image for plugin in self.sys_plugins.all_plugins if plugin.image),
*(addon.image for addon in self.sys_addons.installed if addon.image),
}

View File

@ -89,7 +89,7 @@ class StoreManager(CoreSysAttributes, FileConfiguration):
"""Update add-ons from repository and reload list."""
# Make a copy to prevent race with other tasks
repositories = [repository] if repository else self.all.copy()
results: list[bool | Exception] = await asyncio.gather(
results: list[bool | BaseException] = await asyncio.gather(
*[repo.update() for repo in repositories], return_exceptions=True
)

View File

@ -47,7 +47,7 @@ def _read_addon_translations(addon_path: Path) -> dict:
Should be run in the executor.
"""
translations_dir = addon_path / "translations"
translations = {}
translations: dict[str, Any] = {}
if not translations_dir.exists():
return translations
@ -144,7 +144,7 @@ class StoreData(CoreSysAttributes):
self.addons = addons
async def _find_addon_configs(
self, path: Path, repository: dict
self, path: Path, repository: str
) -> list[Path] | None:
"""Find add-ons in the path."""

View File

@ -31,7 +31,9 @@ class GitRepo(CoreSysAttributes):
self.path: Path = path
self.lock: asyncio.Lock = asyncio.Lock()
self.data: dict[str, str] = RE_REPOSITORY.match(url).groupdict()
if not (repository := RE_REPOSITORY.match(url)):
raise ValueError(f"Invalid url provided for repository GitRepo: {url}")
self.data: dict[str, str] = repository.groupdict()
def __repr__(self) -> str:
"""Return internal representation."""
@ -102,7 +104,10 @@ class GitRepo(CoreSysAttributes):
)
self.repo = await self.sys_run_in_executor(
ft.partial(
git.Repo.clone_from, self.url, str(self.path), **git_args
git.Repo.clone_from,
self.url,
str(self.path),
**git_args, # type: ignore
)
)
@ -124,10 +129,10 @@ class GitRepo(CoreSysAttributes):
"""Pull Git add-on repo."""
if self.lock.locked():
_LOGGER.warning("There is already a task in progress")
return
return False
if self.repo is None:
_LOGGER.warning("No valid repository for %s", self.url)
return
return False
async with self.lock:
_LOGGER.info("Update add-on %s repository from %s", self.path, self.url)
@ -146,7 +151,7 @@ class GitRepo(CoreSysAttributes):
await self.sys_run_in_executor(
ft.partial(
self.repo.remotes.origin.fetch,
**{"update-shallow": True, "depth": 1},
**{"update-shallow": True, "depth": 1}, # type: ignore
)
)

View File

@ -2,6 +2,7 @@
import logging
from pathlib import Path
from typing import cast
import voluptuous as vol
@ -81,7 +82,7 @@ class Repository(CoreSysAttributes):
Must be run in executor.
"""
if self.type != StoreType.GIT:
if not self.git or self.type == StoreType.CORE:
return True
# If exists?
@ -119,7 +120,7 @@ class Repository(CoreSysAttributes):
if not await self.sys_run_in_executor(self.validate):
return False
if self.type != StoreType.LOCAL:
if self.git:
return await self.git.pull()
# Check local modifications
@ -139,7 +140,7 @@ class Repository(CoreSysAttributes):
async def remove(self) -> None:
"""Remove add-on repository."""
if self.type != StoreType.GIT:
if not self.git or self.type == StoreType.CORE:
raise StoreError("Can't remove built-in repositories!", _LOGGER.error)
await self.git.remove()
await cast(GitRepoCustom, self.git).remove()

View File

@ -106,17 +106,22 @@ class Supervisor(CoreSysAttributes):
return AwesomeVersion(SUPERVISOR_VERSION)
@property
def latest_version(self) -> AwesomeVersion:
"""Return last available version of Home Assistant."""
def latest_version(self) -> AwesomeVersion | None:
"""Return last available version of ."""
return self.sys_updater.version_supervisor
@property
def image(self) -> str:
"""Return image name of Home Assistant container."""
def default_image(self) -> str:
"""Return the default image for this system."""
return f"ghcr.io/home-assistant/{self.sys_arch.supervisor}-hassio-supervisor"
@property
def image(self) -> str | None:
"""Return image name of Supervisor container."""
return self.instance.image
@property
def arch(self) -> str:
def arch(self) -> str | None:
"""Return arch of the Supervisor container."""
return self.instance.arch
@ -192,9 +197,9 @@ class Supervisor(CoreSysAttributes):
async def update(self, version: AwesomeVersion | None = None) -> None:
"""Update Supervisor version."""
version = version or self.latest_version
version = version or self.latest_version or self.version
if version == self.sys_supervisor.version:
if version == self.version:
raise SupervisorUpdateError(
f"Version {version!s} is already installed", _LOGGER.warning
)

View File

@ -27,7 +27,7 @@ from .const import (
BusEvent,
UpdateChannel,
)
from .coresys import CoreSysAttributes
from .coresys import CoreSys, CoreSysAttributes
from .exceptions import (
CodeNotaryError,
CodeNotaryUntrusted,
@ -45,7 +45,7 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
class Updater(FileConfiguration, CoreSysAttributes):
"""Fetch last versions from version.json."""
def __init__(self, coresys):
def __init__(self, coresys: CoreSys) -> None:
"""Initialize updater."""
super().__init__(FILE_HASSIO_UPDATER, SCHEMA_UPDATER_CONFIG)
self.coresys = coresys

View File

@ -56,7 +56,7 @@ async def check_port(address: IPv4Address, port: int) -> bool:
return True
def check_exception_chain(err: Exception, object_type: Any) -> bool:
def check_exception_chain(err: BaseException, object_type: Any) -> bool:
"""Check if exception chain include sub exception.
It's not full recursive because we need mostly only access to the latest.
@ -70,7 +70,7 @@ def check_exception_chain(err: Exception, object_type: Any) -> bool:
return check_exception_chain(err.__context__, object_type)
def get_message_from_exception_chain(err: Exception) -> str:
def get_message_from_exception_chain(err: BaseException) -> str:
"""Get the first message from the exception chain."""
if str(err):
return str(err)
@ -119,8 +119,8 @@ def remove_folder_with_excludes(
Must be run in executor.
"""
with TemporaryDirectory(dir=tmp_dir) as temp_path:
temp_path = Path(temp_path)
with TemporaryDirectory(dir=tmp_dir) as temp_path_str:
temp_path = Path(temp_path_str)
moved_files: list[Path] = []
for item in folder.iterdir():
if any(item.match(exclude) for exclude in excludes):

View File

@ -87,13 +87,15 @@ class FileConfiguration:
if not self._file:
raise RuntimeError("Path to config file must be set!")
def _read_data() -> dict[str, Any]:
if self._file.is_file():
def _read_data(file: Path) -> dict[str, Any]:
if file.is_file():
with suppress(ConfigurationFileError):
return read_json_or_yaml_file(self._file)
return read_json_or_yaml_file(file)
return _DEFAULT
self._data = await asyncio.get_running_loop().run_in_executor(None, _read_data)
self._data = await asyncio.get_running_loop().run_in_executor(
None, _read_data, self._file
)
# Validate
try:

View File

@ -3,9 +3,9 @@
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable, Coroutine
from collections.abc import Awaitable, Callable
import logging
from typing import Any, cast
from typing import Any, Protocol, cast
from dbus_fast import (
ErrorType,
@ -46,6 +46,20 @@ DBUS_INTERFACE_PROPERTIES: str = "org.freedesktop.DBus.Properties"
DBUS_METHOD_GETALL: str = "org.freedesktop.DBus.Properties.GetAll"
class GetWithUnpack(Protocol):
"""Protocol class for dbus get signature."""
def __call__(self, *, unpack_variants: bool = True) -> Awaitable[Any]:
"""Signature for dbus get unpack kwarg."""
class UpdatePropertiesCallback(Protocol):
"""Protocol class for update properties callback."""
def __call__(self, changed: dict[str, Any] | None = None) -> Awaitable[None]:
"""Signature for an update properties callback function."""
class DBus:
"""DBus handler."""
@ -216,10 +230,17 @@ class DBus:
return self._proxy_obj is not None
@property
def properties(self) -> DBusCallWrapper | None:
def supports_properties(self) -> bool:
"""Return true if properties interface supported by DBus object."""
return DBUS_INTERFACE_PROPERTIES in self._proxies
@property
def properties(self) -> DBusCallWrapper:
"""Get properties proxy interface."""
if DBUS_INTERFACE_PROPERTIES not in self._proxies:
return None
if not self.supports_properties:
raise DBusInterfaceError(
f"DBus Object does not have interface {DBUS_INTERFACE_PROPERTIES}"
)
return DBusCallWrapper(self, DBUS_INTERFACE_PROPERTIES)
@property
@ -231,16 +252,12 @@ class DBus:
async def get_properties(self, interface: str) -> dict[str, Any]:
"""Read all properties from interface."""
if not self.properties:
raise DBusInterfaceError(
f"DBus Object does not have interface {DBUS_INTERFACE_PROPERTIES}"
)
return await self.properties.call_get_all(interface)
return await self.properties.call("get_all", interface)
def sync_property_changes(
self,
interface: str,
update: Callable[[dict[str, Any]], Coroutine[None]],
update: UpdatePropertiesCallback,
) -> Callable:
"""Sync property changes for interface with cache.
@ -249,7 +266,7 @@ class DBus:
async def sync_property_change(
prop_interface: str, changed: dict[str, Variant], invalidated: list[str]
):
) -> None:
"""Sync property changes to cache."""
if interface != prop_interface:
return
@ -267,12 +284,12 @@ class DBus:
else:
await update(changed)
self.properties.on_properties_changed(sync_property_change)
self.properties.on("properties_changed", sync_property_change)
return sync_property_change
def stop_sync_property_changes(self, sync_property_change: Callable):
"""Stop syncing property changes with cache."""
self.properties.off_properties_changed(sync_property_change)
self.properties.off("properties_changed", sync_property_change)
def disconnect(self):
"""Remove all active signal listeners."""
@ -356,10 +373,11 @@ class DBusCallWrapper:
if not self._proxy:
return DBusCallWrapper(self.dbus, f"{self.interface}.{name}")
dbus_proxy = self._proxy
dbus_parts = name.split("_", 1)
dbus_type = dbus_parts[0]
if not hasattr(self._proxy, name):
if not hasattr(dbus_proxy, name):
message = f"{name} does not exist in D-Bus interface {self.interface}!"
if dbus_type == "call":
raise DBusInterfaceMethodError(message, _LOGGER.error)
@ -383,7 +401,7 @@ class DBusCallWrapper:
if dbus_type == "on":
def _on_signal(callback: Callable):
getattr(self._proxy, name)(callback, unpack_variants=True)
getattr(dbus_proxy, name)(callback, unpack_variants=True)
# pylint: disable=protected-access
self.dbus._add_signal_monitor(self.interface, dbus_name, callback)
@ -392,7 +410,7 @@ class DBusCallWrapper:
return _on_signal
def _off_signal(callback: Callable):
getattr(self._proxy, name)(callback, unpack_variants=True)
getattr(dbus_proxy, name)(callback, unpack_variants=True)
# pylint: disable=protected-access
if (
@ -421,7 +439,7 @@ class DBusCallWrapper:
def _method_wrapper(*args, unpack_variants: bool = True) -> Awaitable:
return DBus.call_dbus(
self._proxy, name, *args, unpack_variants=unpack_variants
dbus_proxy, name, *args, unpack_variants=unpack_variants
)
return _method_wrapper
@ -429,7 +447,7 @@ class DBusCallWrapper:
elif dbus_type == "set":
def _set_wrapper(*args) -> Awaitable:
return DBus.call_dbus(self._proxy, name, *args, unpack_variants=False)
return DBus.call_dbus(dbus_proxy, name, *args, unpack_variants=False)
return _set_wrapper
@ -448,7 +466,7 @@ class DBusCallWrapper:
def get(self, name: str, *, unpack_variants: bool = True) -> Awaitable[Any]:
"""Get a dbus property value."""
return cast(Callable[[bool], Awaitable[Any]], self._dbus_action(f"get_{name}"))(
return cast(GetWithUnpack, self._dbus_action(f"get_{name}"))(
unpack_variants=unpack_variants
)

View File

@ -3,7 +3,6 @@
import asyncio
from functools import partial
import logging
from typing import Any
from aiohttp.web_exceptions import HTTPBadGateway, HTTPServiceUnavailable
import sentry_sdk
@ -56,28 +55,6 @@ def init_sentry(coresys: CoreSys) -> None:
)
def capture_event(event: dict[str, Any], only_once: str | None = None):
"""Capture an event and send to sentry.
Must be called in executor.
"""
if sentry_sdk.is_initialized():
if only_once and only_once not in only_once_events:
only_once_events.add(only_once)
sentry_sdk.capture_event(event)
async def async_capture_event(event: dict[str, Any], only_once: str | None = None):
"""Capture an event and send to sentry.
Safe to call from event loop.
"""
if sentry_sdk.is_initialized():
await asyncio.get_running_loop().run_in_executor(
None, capture_event, event, only_once
)
def capture_exception(err: BaseException) -> None:
"""Capture an exception and send to sentry.

View File

@ -107,17 +107,17 @@ async def journal_logs_reader(
# followed by a newline as separator to the next field.
if not data.endswith(b"\n"):
raise MalformedBinaryEntryError(
f"Failed parsing binary entry {data}"
f"Failed parsing binary entry {data.decode('utf-8', errors='replace')}"
)
name = name.decode("utf-8")
if name not in formatter_.required_fields:
field_name = name.decode("utf-8")
if field_name not in formatter_.required_fields:
# we must read to the end of the entry in the stream, so we can
# only continue the loop here
continue
# strip \n for simple fields before decoding
entries[name] = data[:-1].decode("utf-8")
entries[field_name] = data[:-1].decode("utf-8")
def _parse_boot_json(boot_json_bytes: bytes) -> tuple[int, str]:

View File

@ -9,7 +9,7 @@ from yaml import YAMLError, dump, load
try:
from yaml import CDumper as Dumper, CSafeLoader as SafeLoader
except ImportError:
from yaml import Dumper, SafeLoader
from yaml import Dumper, SafeLoader # type: ignore
from ..exceptions import YamlFileError

View File

@ -119,16 +119,45 @@ async def test_list_users(
]
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
@pytest.mark.parametrize(
("field", "api_client"),
[("username", TEST_ADDON_SLUG), ("user", TEST_ADDON_SLUG)],
indirect=["api_client"],
)
async def test_auth_json_success(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon
api_client: TestClient,
mock_check_login: AsyncMock,
install_addon_ssh: Addon,
field: str,
):
"""Test successful JSON auth."""
mock_check_login.return_value = True
resp = await api_client.post("/auth", json={"username": "test", "password": "pass"})
resp = await api_client.post("/auth", json={field: "test", "password": "pass"})
assert resp.status == 200
@pytest.mark.parametrize(
("user", "password", "message", "api_client"),
[
(None, "password", "None as username is not supported!", TEST_ADDON_SLUG),
("user", None, "None as password is not supported!", TEST_ADDON_SLUG),
],
indirect=["api_client"],
)
async def test_auth_json_failure_none(
api_client: TestClient,
install_addon_ssh: Addon,
user: str | None,
password: str | None,
message: str,
):
"""Test failed JSON auth with none user or password."""
resp = await api_client.post("/auth", json={"username": user, "password": password})
assert resp.status == 400
body = await resp.json()
assert body["message"] == message
@pytest.mark.parametrize("api_client", [TEST_ADDON_SLUG], indirect=True)
async def test_auth_json_invalid_credentials(
api_client: TestClient, mock_check_login: AsyncMock, install_addon_ssh: Addon

View File

@ -762,16 +762,6 @@ async def capture_exception() -> Mock:
yield capture_exception
@pytest.fixture
async def capture_event() -> Mock:
"""Mock capture event for testing."""
with (
patch("supervisor.utils.sentry.sentry_sdk.is_initialized", return_value=True),
patch("supervisor.utils.sentry.sentry_sdk.capture_event") as capture_event,
):
yield capture_event
@pytest.fixture
async def os_available(request: pytest.FixtureRequest) -> None:
"""Mock os as available."""