Fix mypy issues in plugins and resolution (#5946)

* Fix mypy issues in plugins

* Fix mypy issues in resolution module

* fix misses in resolution check

* Fix signatures on evaluate methods

* nitpick fix suggestions
This commit is contained in:
Mike Degatano 2025-06-16 14:12:47 -04:00 committed by GitHub
parent 1fe6f8ad99
commit 0e8ace949a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 184 additions and 106 deletions

View File

@ -67,6 +67,10 @@ class AddonManager(CoreSysAttributes):
return self.store.get(addon_slug)
return None
def get_local_only(self, addon_slug: str) -> Addon | None:
"""Return an installed add-on from slug."""
return self.local.get(addon_slug)
def from_token(self, token: str) -> Addon | None:
"""Return an add-on from Supervisor token."""
for addon in self.installed:

View File

@ -1,7 +1,7 @@
"""Init file for Supervisor network RESTful API."""
import logging
from typing import Any, cast
from typing import Any
from aiohttp import web
import voluptuous as vol
@ -56,8 +56,8 @@ class APIDiscovery(CoreSysAttributes):
}
for message in self.sys_discovery.list_messages
if (
discovered := cast(
Addon, self.sys_addons.get(message.addon, local_only=True)
discovered := self.sys_addons.get_local_only(
message.addon,
)
)
and discovered.state == AddonState.STARTED

View File

@ -126,9 +126,7 @@ class APIStore(CoreSysAttributes):
"""Generate addon information."""
installed = (
cast(Addon, self.sys_addons.get(addon.slug, local_only=True))
if addon.is_installed
else None
self.sys_addons.get_local_only(addon.slug) if addon.is_installed else None
)
data = {

View File

@ -35,7 +35,7 @@ class Ingress(FileConfiguration, CoreSysAttributes):
"""Return addon they have this ingress token."""
if token not in self.tokens:
return None
return self.sys_addons.get(self.tokens[token], local_only=True)
return self.sys_addons.get_local_only(self.tokens[token])
def get_session_data(self, session_id: str) -> IngressSessionData | None:
"""Return complementary data of current session or None."""

View File

@ -63,7 +63,11 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
def need_update(self) -> bool:
"""Return True if an update is available."""
try:
return self.version < self.latest_version
return (
self.version is not None
and self.latest_version is not None
and self.version < self.latest_version
)
except (AwesomeVersionException, TypeError):
return False
@ -153,6 +157,10 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
async def start(self) -> None:
"""Start system plugin."""
@abstractmethod
async def stop(self) -> None:
"""Stop system plugin."""
async def load(self) -> None:
"""Load system plugin."""
self.start_watchdog()

View File

@ -6,6 +6,7 @@ Code: https://github.com/home-assistant/plugin-cli
from collections.abc import Awaitable
import logging
import secrets
from typing import cast
from awesomeversion import AwesomeVersion
@ -55,7 +56,7 @@ class PluginCli(PluginBase):
@property
def supervisor_token(self) -> str:
"""Return an access token for the Supervisor API."""
return self._data.get(ATTR_ACCESS_TOKEN)
return cast(str, self._data[ATTR_ACCESS_TOKEN])
@Job(
name="plugin_cli_update",

View File

@ -71,8 +71,8 @@ class PluginDns(PluginBase):
self.slug = "dns"
self.coresys: CoreSys = coresys
self.instance: DockerDNS = DockerDNS(coresys)
self.resolv_template: jinja2.Template | None = None
self.hosts_template: jinja2.Template | None = None
self._resolv_template: jinja2.Template | None = None
self._hosts_template: jinja2.Template | None = None
self._hosts: list[HostEntry] = []
self._loop: bool = False
@ -147,11 +147,25 @@ class PluginDns(PluginBase):
"""Set fallback DNS enabled."""
self._data[ATTR_FALLBACK] = value
@property
def hosts_template(self) -> jinja2.Template:
"""Get hosts jinja template."""
if not self._hosts_template:
raise RuntimeError("Hosts template not set!")
return self._hosts_template
@property
def resolv_template(self) -> jinja2.Template:
"""Get resolv jinja template."""
if not self._resolv_template:
raise RuntimeError("Resolv template not set!")
return self._resolv_template
async def load(self) -> None:
"""Load DNS setup."""
# Initialize CoreDNS Template
try:
self.resolv_template = jinja2.Template(
self._resolv_template = jinja2.Template(
await self.sys_run_in_executor(RESOLV_TMPL.read_text, encoding="utf-8")
)
except OSError as err:
@ -162,7 +176,7 @@ class PluginDns(PluginBase):
_LOGGER.error("Can't read resolve.tmpl: %s", err)
try:
self.hosts_template = jinja2.Template(
self._hosts_template = jinja2.Template(
await self.sys_run_in_executor(HOSTS_TMPL.read_text, encoding="utf-8")
)
except OSError as err:
@ -176,7 +190,9 @@ class PluginDns(PluginBase):
await super().load()
# Update supervisor
await self._write_resolv(HOST_RESOLV)
# Resolv template should always be set but just in case don't fail load
if self._resolv_template:
await self._write_resolv(HOST_RESOLV)
# Reinitializing aiohttp.ClientSession after DNS setup makes sure that
# aiodns is using the right DNS servers (see #5857).
@ -428,12 +444,6 @@ class PluginDns(PluginBase):
async def _write_resolv(self, resolv_conf: Path) -> None:
"""Update/Write resolv.conf file."""
if not self.resolv_template:
_LOGGER.warning(
"Resolv template is missing, cannot write/update %s", resolv_conf
)
return
nameservers = [str(self.sys_docker.network.dns), "127.0.0.11"]
# Read resolv config

View File

@ -5,6 +5,7 @@ Code: https://github.com/home-assistant/plugin-observer
import logging
import secrets
from typing import cast
import aiohttp
from awesomeversion import AwesomeVersion
@ -60,7 +61,7 @@ class PluginObserver(PluginBase):
@property
def supervisor_token(self) -> str:
"""Return an access token for the Observer API."""
return self._data.get(ATTR_ACCESS_TOKEN)
return cast(str, self._data[ATTR_ACCESS_TOKEN])
@Job(
name="plugin_observer_update",
@ -90,6 +91,10 @@ class PluginObserver(PluginBase):
_LOGGER.error("Can't start observer plugin")
raise ObserverError() from err
async def stop(self) -> None:
"""Raise. Supervisor should not stop observer."""
raise RuntimeError("Stopping observer without a restart is not supported!")
async def stats(self) -> DockerStats:
"""Return stats of observer."""
try:

View File

@ -67,10 +67,11 @@ class CheckAddonPwned(CheckBase):
@Job(name="check_addon_pwned_approve", conditions=[JobCondition.INTERNET_SYSTEM])
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
addon = self.sys_addons.get(reference)
if not reference:
return False
# Uninstalled
if not addon or not addon.is_installed:
if not (addon := self.sys_addons.get_local_only(reference)):
return False
# Not in use anymore

View File

@ -29,9 +29,11 @@ class CheckDetachedAddonMissing(CheckBase):
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
return (
addon := self.sys_addons.get(reference, local_only=True)
) and addon.is_detached
if not reference:
return False
addon = self.sys_addons.get_local_only(reference)
return addon is not None and addon.is_detached
@property
def issue(self) -> IssueType:

View File

@ -27,9 +27,11 @@ class CheckDetachedAddonRemoved(CheckBase):
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
return (
addon := self.sys_addons.get(reference, local_only=True)
) and addon.is_detached
if not reference:
return False
addon = self.sys_addons.get_local_only(reference)
return addon is not None and addon.is_detached
@property
def issue(self) -> IssueType:

View File

@ -35,6 +35,9 @@ class CheckDisabledDataDisk(CheckBase):
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
if not reference:
return False
resolved = await self.sys_dbus.udisks2.resolve_device(
DeviceSpecification(path=Path(reference))
)
@ -43,7 +46,7 @@ class CheckDisabledDataDisk(CheckBase):
def _is_disabled_data_disk(self, block_device: UDisks2Block) -> bool:
"""Return true if filesystem block device has name indicating it was disabled by OS."""
return (
block_device.filesystem
block_device.filesystem is not None
and block_device.id_label == FILESYSTEM_LABEL_DISABLED_DATA_DISK
)

View File

@ -2,6 +2,7 @@
import asyncio
from datetime import timedelta
from typing import Literal
from aiodns import DNSResolver
from aiodns.error import DNSError
@ -16,7 +17,7 @@ from .base import CheckBase
async def check_server(
loop: asyncio.AbstractEventLoop, server: str, qtype: str
loop: asyncio.AbstractEventLoop, server: str, qtype: Literal["A"] | Literal["AAAA"]
) -> None:
"""Check a DNS server and report issues."""
ip_addr = server[6:] if server.startswith("dns://") else server
@ -54,13 +55,15 @@ class CheckDNSServer(CheckBase):
*[check_server(self.sys_loop, server, "A") for server in dns_servers],
return_exceptions=True,
)
for i in (r for r in range(len(results)) if isinstance(results[r], DNSError)):
self.sys_resolution.create_issue(
IssueType.DNS_SERVER_FAILED,
ContextType.DNS_SERVER,
reference=dns_servers[i],
)
await async_capture_exception(results[i])
# pylint: disable-next=consider-using-enumerate
for i in range(len(results)):
if isinstance(result := results[i], DNSError):
self.sys_resolution.create_issue(
IssueType.DNS_SERVER_FAILED,
ContextType.DNS_SERVER,
reference=dns_servers[i],
)
await async_capture_exception(result)
@Job(name="check_dns_server_approve", conditions=[JobCondition.INTERNET_SYSTEM])
async def approve_check(self, reference: str | None = None) -> bool:

View File

@ -5,8 +5,6 @@ from datetime import timedelta
from aiodns.error import DNSError
from supervisor.resolution.checks.dns_server import check_server
from ...const import CoreState
from ...coresys import CoreSys
from ...jobs.const import JobCondition, JobExecutionLimit
@ -14,6 +12,7 @@ from ...jobs.decorator import Job
from ...utils.sentry import async_capture_exception
from ..const import DNS_ERROR_NO_DATA, ContextType, IssueType
from .base import CheckBase
from .dns_server import check_server
def setup(coresys: CoreSys) -> CheckBase:
@ -37,18 +36,18 @@ class CheckDNSServerIPv6(CheckBase):
*[check_server(self.sys_loop, server, "AAAA") for server in dns_servers],
return_exceptions=True,
)
for i in (
r
for r in range(len(results))
if isinstance(results[r], DNSError)
and results[r].args[0] != DNS_ERROR_NO_DATA
):
self.sys_resolution.create_issue(
IssueType.DNS_SERVER_IPV6_ERROR,
ContextType.DNS_SERVER,
reference=dns_servers[i],
)
await async_capture_exception(results[i])
# pylint: disable-next=consider-using-enumerate
for i in range(len(results)):
if (
isinstance(result := results[i], DNSError)
and result.args[0] != DNS_ERROR_NO_DATA
):
self.sys_resolution.create_issue(
IssueType.DNS_SERVER_IPV6_ERROR,
ContextType.DNS_SERVER,
reference=dns_servers[i],
)
await async_capture_exception(result)
@Job(
name="check_dns_server_ipv6_approve", conditions=[JobCondition.INTERNET_SYSTEM]

View File

@ -35,6 +35,9 @@ class CheckMultipleDataDisks(CheckBase):
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
if not reference:
return False
resolved = await self.sys_dbus.udisks2.resolve_device(
DeviceSpecification(path=Path(reference))
)
@ -43,7 +46,7 @@ class CheckMultipleDataDisks(CheckBase):
def _block_device_has_name_issue(self, block_device: UDisks2Block) -> bool:
"""Return true if filesystem block device incorrectly has data disk name."""
return (
block_device.filesystem
block_device.filesystem is not None
and block_device.id_label == FILESYSTEM_LABEL_DATA_DISK
and block_device.device != self.sys_dbus.agent.datadisk.current_device
)

View File

@ -1,6 +1,6 @@
"""Data objects."""
from uuid import UUID, uuid4
from uuid import uuid4
import attr
@ -20,7 +20,7 @@ class Issue:
type: IssueType = attr.ib()
context: ContextType = attr.ib()
reference: str | None = attr.ib(default=None)
uuid: UUID = attr.ib(factory=lambda: uuid4().hex, eq=False, init=False)
uuid: str = attr.ib(factory=lambda: uuid4().hex, eq=False, init=False)
@attr.s(frozen=True, slots=True)
@ -30,7 +30,7 @@ class Suggestion:
type: SuggestionType = attr.ib()
context: ContextType = attr.ib()
reference: str | None = attr.ib(default=None)
uuid: UUID = attr.ib(factory=lambda: uuid4().hex, eq=False, init=False)
uuid: str = attr.ib(factory=lambda: uuid4().hex, eq=False, init=False)
@attr.s(frozen=True, slots=True)

View File

@ -33,7 +33,7 @@ class EvaluateAppArmor(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]
async def evaluate(self) -> None:
async def evaluate(self) -> bool:
"""Run evaluation."""
try:
apparmor = await self.sys_run_in_executor(

View File

@ -38,7 +38,7 @@ class EvaluateContainer(EvaluateBase):
"""Initialize the evaluation class."""
super().__init__(coresys)
self.coresys = coresys
self._images = set()
self._images: set[str] = set()
@property
def reason(self) -> UnsupportedReason:
@ -61,8 +61,8 @@ class EvaluateContainer(EvaluateBase):
return {
self.sys_homeassistant.image,
self.sys_supervisor.image,
*(plugin.image for plugin in self.sys_plugins.all_plugins),
*(addon.image for addon in self.sys_addons.installed),
*(plugin.image for plugin in self.sys_plugins.all_plugins if plugin.image),
*(addon.image for addon in self.sys_addons.installed if addon.image),
}
async def evaluate(self) -> bool:

View File

@ -29,6 +29,6 @@ class EvaluateContentTrust(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]
async def evaluate(self) -> None:
async def evaluate(self) -> bool:
"""Run evaluation."""
return not self.sys_security.content_trust

View File

@ -29,6 +29,6 @@ class EvaluateDbus(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]
async def evaluate(self) -> None:
async def evaluate(self) -> bool:
"""Run evaluation."""
return not SOCKET_DBUS.exists()
return not await self.sys_run_in_executor(SOCKET_DBUS.exists)

View File

@ -29,7 +29,7 @@ class EvaluateDNSServer(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.RUNNING]
async def evaluate(self) -> None:
async def evaluate(self) -> bool:
"""Run evaluation."""
return (
not self.sys_plugins.dns.fallback

View File

@ -36,7 +36,7 @@ class EvaluateDockerConfiguration(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
storage_driver = self.sys_docker.info.storage
logging_driver = self.sys_docker.info.logging

View File

@ -29,6 +29,6 @@ class EvaluateDockerVersion(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
return not self.sys_docker.info.supported_version

View File

@ -29,6 +29,6 @@ class EvaluateJobConditions(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]
async def evaluate(self) -> None:
async def evaluate(self) -> bool:
"""Run evaluation."""
return len(self.sys_jobs.ignore_conditions) > 0

View File

@ -32,10 +32,10 @@ class EvaluateLxc(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
def check_lxc():
def check_lxc() -> bool:
with suppress(OSError):
if "container=lxc" in Path("/proc/1/environ").read_text(
encoding="utf-8"

View File

@ -30,6 +30,6 @@ class EvaluateNetworkManager(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP, CoreState.RUNNING]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
return HostFeature.NETWORK not in self.sys_host.features

View File

@ -31,7 +31,7 @@ class EvaluateOperatingSystem(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
if self.sys_os.available:
return False

View File

@ -30,6 +30,6 @@ class EvaluateOSAgent(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
return HostFeature.OS_AGENT not in self.sys_host.features

View File

@ -29,6 +29,6 @@ class EvaluatePrivileged(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.INITIALIZE]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
return not self.sys_supervisor.instance.privileged

View File

@ -21,7 +21,7 @@ class EvaluateRestartPolicy(EvaluateBase):
"""Initialize the evaluation class."""
super().__init__(coresys)
self.coresys = coresys
self._containers: list[str] = []
self._containers: set[str] = set()
@property
def reason(self) -> UnsupportedReason:

View File

@ -29,6 +29,6 @@ class EvaluateSupervisorVersion(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.RUNNING, CoreState.STARTUP]
async def evaluate(self) -> None:
async def evaluate(self) -> bool:
"""Run evaluation."""
return not self.sys_updater.auto_update and self.sys_supervisor.need_update

View File

@ -6,6 +6,14 @@ from ...host.const import HostFeature
from ..const import UnsupportedReason
from .base import EvaluateBase
FEATURES_REQUIRED: tuple[HostFeature, ...] = (
HostFeature.HOSTNAME,
HostFeature.SERVICES,
HostFeature.SHUTDOWN,
HostFeature.REBOOT,
HostFeature.TIMEDATE,
)
def setup(coresys: CoreSys) -> EvaluateBase:
"""Initialize evaluation-setup function."""
@ -30,15 +38,8 @@ class EvaluateSystemd(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
return any(
feature not in self.sys_host.features
for feature in (
HostFeature.HOSTNAME,
HostFeature.SERVICES,
HostFeature.SHUTDOWN,
HostFeature.REBOOT,
HostFeature.TIMEDATE,
)
feature not in self.sys_host.features for feature in FEATURES_REQUIRED
)

View File

@ -29,11 +29,11 @@ class EvaluateVirtualizationImage(EvaluateBase):
"""Return a list of valid states when this evaluation can run."""
return [CoreState.SETUP]
async def evaluate(self):
async def evaluate(self) -> bool:
"""Run evaluation."""
if not self.sys_os.available:
return False
return self.sys_host.info.virtualization and self.sys_os.board not in {
return bool(self.sys_host.info.virtualization) and self.sys_os.board not in {
"ova",
"generic-aarch64",
}

View File

@ -20,7 +20,10 @@ class FixupAddonDisableBoot(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not (addon := self.sys_addons.get(reference, local_only=True)):
if not reference:
return
if not (addon := self.sys_addons.get_local_only(reference)):
_LOGGER.info("Cannot change addon %s as it does not exist", reference)
return

View File

@ -20,7 +20,10 @@ class FixupAddonExecuteRebuild(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Rebuild the addon's container."""
addon = self.sys_addons.get(reference, local_only=True)
if not reference:
return
addon = self.sys_addons.get_local_only(reference)
if not addon:
_LOGGER.info(
"Cannot rebuild addon %s as it is not installed, dismissing suggestion",

View File

@ -20,7 +20,10 @@ class FixupAddonExecuteRemove(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not (addon := self.sys_addons.get(reference, local_only=True)):
if not reference:
return
if not (addon := self.sys_addons.get_local_only(reference)):
_LOGGER.info("Addon %s already removed", reference)
return

View File

@ -25,7 +25,10 @@ class FixupAddonExecuteRepair(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Pull the addons image."""
addon = self.sys_addons.get(reference, local_only=True)
if not reference:
return
addon = self.sys_addons.get_local_only(reference)
if not addon:
_LOGGER.info(
"Cannot repair addon %s as it is not installed, dismissing suggestion",

View File

@ -20,7 +20,10 @@ class FixupAddonExecuteRestart(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not (addon := self.sys_addons.get(reference, local_only=True)):
if not reference:
return
if not (addon := self.sys_addons.get_local_only(reference)):
_LOGGER.info("Cannot restart addon %s as it does not exist", reference)
return

View File

@ -21,7 +21,10 @@ class FixupAddonExecuteStart(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not (addon := self.sys_addons.get(reference, local_only=True)):
if not reference:
return
if not (addon := self.sys_addons.get_local_only(reference)):
_LOGGER.info("Cannot start addon %s as it does not exist", reference)
return

View File

@ -22,9 +22,7 @@ class FixupBase(ABC, CoreSysAttributes):
"""Execute the evaluation."""
if not fixing_suggestion:
# Get suggestion to fix
fixing_suggestion: Suggestion | None = next(
iter(self.all_suggestions), None
)
fixing_suggestion = next(iter(self.all_suggestions), None)
# No suggestion
if fixing_suggestion is None:

View File

@ -32,6 +32,9 @@ class FixupStoreExecuteReload(FixupBase):
)
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not reference:
return
_LOGGER.info("Reload Store: %s", reference)
try:
repository = self.sys_store.get(reference)

View File

@ -20,6 +20,9 @@ class FixupStoreExecuteRemove(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not reference:
return
_LOGGER.info("Remove invalid Store: %s", reference)
try:
repository = self.sys_store.get(reference)

View File

@ -34,6 +34,9 @@ class FixupStoreExecuteReset(FixupBase):
)
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not reference:
return
_LOGGER.info("Reset corrupt Store: %s", reference)
try:
repository = self.sys_store.get(reference)
@ -41,9 +44,11 @@ class FixupStoreExecuteReset(FixupBase):
_LOGGER.warning("Can't find store %s for fixup", reference)
return
await self.sys_run_in_executor(
partial(remove_folder, folder=repository.git.path, content_only=True)
)
# Local add-ons are not a git repo, can't remove and re-pull
if repository.git:
await self.sys_run_in_executor(
partial(remove_folder, folder=repository.git.path, content_only=True)
)
# Load data again
try:

View File

@ -23,6 +23,9 @@ class FixupSystemAdoptDataDisk(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not reference:
return
if not (
new_resolved := await self.sys_dbus.udisks2.resolve_device(
DeviceSpecification(path=Path(reference))

View File

@ -23,6 +23,9 @@ class FixupSystemRenameDataDisk(FixupBase):
async def process_fixup(self, reference: str | None = None) -> None:
"""Initialize the fixup class."""
if not reference:
return
resolved = await self.sys_dbus.udisks2.resolve_device(
DeviceSpecification(path=Path(reference))
)

View File

@ -257,7 +257,7 @@ class ResolutionManager(FileConfiguration, CoreSysAttributes):
if not self.issues_for_suggestion(suggestion):
self.dismiss_suggestion(suggestion)
def dismiss_unsupported(self, reason: Issue) -> None:
def dismiss_unsupported(self, reason: UnsupportedReason) -> None:
"""Dismiss a reason for unsupported."""
if reason not in self._unsupported:
raise ResolutionError(f"The reason {reason} is not active", _LOGGER.warning)

View File

@ -30,7 +30,7 @@ class AddonStore(AddonModel):
@property
def is_installed(self) -> bool:
"""Return True if an add-on is installed."""
return self.sys_addons.get(self.slug, local_only=True) is not None
return self.sys_addons.get_local_only(self.slug) is not None
@property
def is_detached(self) -> bool:

View File

@ -55,6 +55,15 @@ async def api_token_validation(aiohttp_client, coresys: CoreSys) -> TestClient:
yield await aiohttp_client(api.webapp)
@pytest.fixture(name="plugin_tokens")
async def fixture_plugin_tokens(coresys: CoreSys) -> None:
"""Mock plugin tokens used in middleware."""
# pylint: disable=protected-access
coresys.plugins.cli._data["access_token"] = "c_123456"
coresys.plugins.observer._data["access_token"] = "o_123456"
# pylint: enable=protected-access
@pytest.mark.asyncio
async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys):
"""Test security."""
@ -185,6 +194,7 @@ async def test_bad_requests(
("post", "/addons/abc123/sys_options", set()),
],
)
@pytest.mark.usefixtures("plugin_tokens")
async def test_token_validation(
api_token_validation: TestClient,
install_addon_example: Addon,
@ -210,6 +220,7 @@ async def test_token_validation(
assert resp.status == 403
@pytest.mark.usefixtures("plugin_tokens")
async def test_home_assistant_paths(api_token_validation: TestClient, coresys: CoreSys):
"""Test Home Assistant only paths."""
coresys.homeassistant.supervisor_token = "abc123"

View File

@ -209,7 +209,6 @@ async def test_load_error(
assert "Can't read resolve.tmpl" in caplog.text
assert "Can't read hosts.tmpl" in caplog.text
assert "Resolv template is missing" in caplog.text
assert coresys.core.healthy is True
caplog.clear()
@ -218,7 +217,6 @@ async def test_load_error(
assert "Can't read resolve.tmpl" in caplog.text
assert "Can't read hosts.tmpl" in caplog.text
assert "Resolv template is missing" in caplog.text
assert coresys.core.healthy is False

View File

@ -73,10 +73,6 @@ async def test_approve(coresys: CoreSys, supervisor_internet):
coresys.security.verify_secret = AsyncMock(return_value=None)
assert not await addon_pwned.approve_check(reference=addon.slug)
addon.is_installed = False
coresys.security.verify_secret = AsyncMock(side_effect=PwnedSecret)
assert not await addon_pwned.approve_check(reference=addon.slug)
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""