Add check for plugin trust (#3080)

* Add check for plugin trust

* Update supervisor/resolution/checks/plugin_trust.py

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2021-08-30 10:56:05 +02:00 committed by GitHub
parent 9bf8d15b01
commit b2abe37d72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 381 additions and 236 deletions

View File

@ -45,7 +45,7 @@ from .ingress import Ingress
from .misc.filter import filter_data from .misc.filter import filter_data
from .misc.scheduler import Scheduler from .misc.scheduler import Scheduler
from .misc.tasks import Tasks from .misc.tasks import Tasks
from .plugins import PluginManager from .plugins.manager import PluginManager
from .resolution.module import ResolutionManager from .resolution.module import ResolutionManager
from .security import Security from .security import Security
from .services import ServiceManager from .services import ServiceManager

View File

@ -33,7 +33,7 @@ if TYPE_CHECKING:
from .jobs import JobManager from .jobs import JobManager
from .misc.scheduler import Scheduler from .misc.scheduler import Scheduler
from .misc.tasks import Tasks from .misc.tasks import Tasks
from .plugins import PluginManager from .plugins.manager import PluginManager
from .resolution.module import ResolutionManager from .resolution.module import ResolutionManager
from .security import Security from .security import Security
from .services import ServiceManager from .services import ServiceManager

View File

@ -1,138 +1 @@
"""Plugin for Supervisor backend.""" """Supervisor plugins to extend functionality."""
import asyncio
import logging
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HassioError
from ..resolution.const import ContextType, IssueType, SuggestionType
from .audio import PluginAudio
from .cli import PluginCli
from .dns import PluginDns
from .multicast import PluginMulticast
from .observer import PluginObserver
_LOGGER: logging.Logger = logging.getLogger(__name__)
class PluginManager(CoreSysAttributes):
"""Manage supported function for plugins."""
def __init__(self, coresys: CoreSys):
"""Initialize plugin manager."""
self.coresys: CoreSys = coresys
self._cli: PluginCli = PluginCli(coresys)
self._dns: PluginDns = PluginDns(coresys)
self._audio: PluginAudio = PluginAudio(coresys)
self._observer: PluginObserver = PluginObserver(coresys)
self._multicast: PluginMulticast = PluginMulticast(coresys)
@property
def cli(self) -> PluginCli:
"""Return cli handler."""
return self._cli
@property
def dns(self) -> PluginDns:
"""Return dns handler."""
return self._dns
@property
def audio(self) -> PluginAudio:
"""Return audio handler."""
return self._audio
@property
def observer(self) -> PluginObserver:
"""Return observer handler."""
return self._observer
@property
def multicast(self) -> PluginMulticast:
"""Return multicast handler."""
return self._multicast
async def load(self) -> None:
"""Load Supervisor plugins."""
# Sequential to avoid issue on slow IO
for plugin in (
self.dns,
self.audio,
self.cli,
self.observer,
self.multicast,
):
try:
await plugin.load()
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't load plugin %s: %s", plugin.slug, err)
self.sys_resolution.create_issue(
IssueType.FATAL_ERROR,
ContextType.PLUGIN,
reference=plugin.slug,
suggestions=[SuggestionType.EXECUTE_REPAIR],
)
self.sys_capture_exception(err)
# Check requirements
await self.sys_updater.reload()
for plugin in (
self.dns,
self.audio,
self.cli,
self.observer,
self.multicast,
):
# Check if need an update
if not plugin.need_update:
continue
_LOGGER.info(
"%s does not have the latest version %s, updating",
plugin.slug,
plugin.latest_version,
)
try:
await plugin.update()
except HassioError:
_LOGGER.error(
"Can't update %s to %s, the Supervisor healthy could be compromised!",
plugin.slug,
plugin.latest_version,
)
self.sys_resolution.create_issue(
IssueType.UPDATE_FAILED,
ContextType.PLUGIN,
reference=plugin.slug,
suggestions=[SuggestionType.EXECUTE_UPDATE],
)
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't update plugin %s: %s", plugin.slug, err)
self.sys_capture_exception(err)
async def repair(self) -> None:
"""Repair Supervisor plugins."""
await asyncio.wait(
[
self.dns.repair(),
self.audio.repair(),
self.cli.repair(),
self.observer.repair(),
self.multicast.repair(),
]
)
async def shutdown(self) -> None:
"""Shutdown Supervisor plugin."""
# Sequential to avoid issue on slow IO
for plugin in (
self.audio,
self.cli,
self.multicast,
self.dns,
):
try:
await plugin.stop()
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't stop plugin %s: %s", plugin.slug, err)
self.sys_capture_exception(err)

View File

@ -7,7 +7,7 @@ from contextlib import suppress
import logging import logging
from pathlib import Path, PurePath from pathlib import Path, PurePath
import shutil import shutil
from typing import Awaitable, Optional from typing import Optional
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
import jinja2 import jinja2
@ -52,11 +52,6 @@ class PluginAudio(PluginBase):
"""Return latest version of Audio.""" """Return latest version of Audio."""
return self.sys_updater.version_audio return self.sys_updater.version_audio
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
async def load(self) -> None: async def load(self) -> None:
"""Load Audio setup.""" """Load Audio setup."""
# Initialize Client Template # Initialize Client Template
@ -171,13 +166,6 @@ class PluginAudio(PluginBase):
_LOGGER.error("Can't stop Audio plugin") _LOGGER.error("Can't stop Audio plugin")
raise AudioError() from err raise AudioError() from err
def logs(self) -> Awaitable[bytes]:
"""Get CoreDNS docker logs.
Return Coroutine.
"""
return self.instance.logs()
async def stats(self) -> DockerStats: async def stats(self) -> DockerStats:
"""Return stats of CoreDNS.""" """Return stats of CoreDNS."""
try: try:
@ -185,13 +173,6 @@ class PluginAudio(PluginBase):
except DockerError as err: except DockerError as err:
raise AudioError() from err raise AudioError() from err
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
async def repair(self) -> None: async def repair(self) -> None:
"""Repair CoreDNS plugin.""" """Repair CoreDNS plugin."""
if await self.instance.exists(): if await self.instance.exists():

View File

@ -1,18 +1,20 @@
"""Supervisor plugins base class.""" """Supervisor plugins base class."""
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional from typing import Awaitable, Optional
from awesomeversion import AwesomeVersion, AwesomeVersionException from awesomeversion import AwesomeVersion, AwesomeVersionException
from ..const import ATTR_IMAGE, ATTR_VERSION from ..const import ATTR_IMAGE, ATTR_VERSION
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..docker.interface import DockerInterface
from ..utils.common import FileConfiguration from ..utils.common import FileConfiguration
class PluginBase(ABC, FileConfiguration, CoreSysAttributes): class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
"""Base class for plugins.""" """Base class for plugins."""
slug: str = "" slug: str
instance: DockerInterface
@property @property
def version(self) -> Optional[AwesomeVersion]: def version(self) -> Optional[AwesomeVersion]:
@ -49,6 +51,39 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
except (AwesomeVersionException, TypeError): except (AwesomeVersionException, TypeError):
return False return False
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
def check_trust(self) -> Awaitable[None]:
"""Calculate plugin docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
def logs(self) -> Awaitable[bytes]:
"""Get docker plugin logs.
Return Coroutine.
"""
return self.instance.logs()
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
def is_failed(self) -> Awaitable[bool]:
"""Return True if a Docker container is failed state.
Return a coroutine.
"""
return self.instance.is_failed()
@abstractmethod @abstractmethod
async def load(self) -> None: async def load(self) -> None:
"""Load system plugin.""" """Load system plugin."""

View File

@ -42,11 +42,6 @@ class PluginCli(PluginBase):
"""Return an access token for the Supervisor API.""" """Return an access token for the Supervisor API."""
return self._data.get(ATTR_ACCESS_TOKEN) return self._data.get(ATTR_ACCESS_TOKEN)
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
async def load(self) -> None: async def load(self) -> None:
"""Load cli setup.""" """Load cli setup."""
# Check cli state # Check cli state

View File

@ -7,7 +7,7 @@ from contextlib import suppress
from ipaddress import IPv4Address from ipaddress import IPv4Address
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Awaitable, List, Optional from typing import List, Optional
import attr import attr
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
@ -98,11 +98,6 @@ class PluginDns(PluginBase):
"""Return latest version of CoreDNS.""" """Return latest version of CoreDNS."""
return self.sys_updater.version_dns return self.sys_updater.version_dns
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
async def load(self) -> None: async def load(self) -> None:
"""Load DNS setup.""" """Load DNS setup."""
# Initialize CoreDNS Template # Initialize CoreDNS Template
@ -369,13 +364,6 @@ class PluginDns(PluginBase):
return entry return entry
return None return None
def logs(self) -> Awaitable[bytes]:
"""Get CoreDNS docker logs.
Return Coroutine.
"""
return self.instance.logs()
async def stats(self) -> DockerStats: async def stats(self) -> DockerStats:
"""Return stats of CoreDNS.""" """Return stats of CoreDNS."""
try: try:
@ -383,20 +371,6 @@ class PluginDns(PluginBase):
except DockerError as err: except DockerError as err:
raise CoreDNSError() from err raise CoreDNSError() from err
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
def is_failed(self) -> Awaitable[bool]:
"""Return True if a Docker container is failed state.
Return a coroutine.
"""
return self.instance.is_failed()
async def repair(self) -> None: async def repair(self) -> None:
"""Repair CoreDNS plugin.""" """Repair CoreDNS plugin."""
if await self.instance.exists(): if await self.instance.exists():

View File

@ -0,0 +1,138 @@
"""Plugin for Supervisor backend."""
import asyncio
import logging
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import HassioError
from ..resolution.const import ContextType, IssueType, SuggestionType
from .audio import PluginAudio
from .cli import PluginCli
from .dns import PluginDns
from .multicast import PluginMulticast
from .observer import PluginObserver
_LOGGER: logging.Logger = logging.getLogger(__name__)
class PluginManager(CoreSysAttributes):
"""Manage supported function for plugins."""
def __init__(self, coresys: CoreSys):
"""Initialize plugin manager."""
self.coresys: CoreSys = coresys
self._cli: PluginCli = PluginCli(coresys)
self._dns: PluginDns = PluginDns(coresys)
self._audio: PluginAudio = PluginAudio(coresys)
self._observer: PluginObserver = PluginObserver(coresys)
self._multicast: PluginMulticast = PluginMulticast(coresys)
@property
def cli(self) -> PluginCli:
"""Return cli handler."""
return self._cli
@property
def dns(self) -> PluginDns:
"""Return dns handler."""
return self._dns
@property
def audio(self) -> PluginAudio:
"""Return audio handler."""
return self._audio
@property
def observer(self) -> PluginObserver:
"""Return observer handler."""
return self._observer
@property
def multicast(self) -> PluginMulticast:
"""Return multicast handler."""
return self._multicast
async def load(self) -> None:
"""Load Supervisor plugins."""
# Sequential to avoid issue on slow IO
for plugin in (
self.dns,
self.audio,
self.cli,
self.observer,
self.multicast,
):
try:
await plugin.load()
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't load plugin %s: %s", plugin.slug, err)
self.sys_resolution.create_issue(
IssueType.FATAL_ERROR,
ContextType.PLUGIN,
reference=plugin.slug,
suggestions=[SuggestionType.EXECUTE_REPAIR],
)
self.sys_capture_exception(err)
# Check requirements
await self.sys_updater.reload()
for plugin in (
self.dns,
self.audio,
self.cli,
self.observer,
self.multicast,
):
# Check if need an update
if not plugin.need_update:
continue
_LOGGER.info(
"%s does not have the latest version %s, updating",
plugin.slug,
plugin.latest_version,
)
try:
await plugin.update()
except HassioError:
_LOGGER.error(
"Can't update %s to %s, the Supervisor healthy could be compromised!",
plugin.slug,
plugin.latest_version,
)
self.sys_resolution.create_issue(
IssueType.UPDATE_FAILED,
ContextType.PLUGIN,
reference=plugin.slug,
suggestions=[SuggestionType.EXECUTE_UPDATE],
)
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't update plugin %s: %s", plugin.slug, err)
self.sys_capture_exception(err)
async def repair(self) -> None:
"""Repair Supervisor plugins."""
await asyncio.wait(
[
self.dns.repair(),
self.audio.repair(),
self.cli.repair(),
self.observer.repair(),
self.multicast.repair(),
]
)
async def shutdown(self) -> None:
"""Shutdown Supervisor plugin."""
# Sequential to avoid issue on slow IO
for plugin in (
self.audio,
self.cli,
self.multicast,
self.dns,
):
try:
await plugin.stop()
except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't stop plugin %s: %s", plugin.slug, err)
self.sys_capture_exception(err)

View File

@ -5,7 +5,7 @@ Code: https://github.com/home-assistant/plugin-multicast
import asyncio import asyncio
from contextlib import suppress from contextlib import suppress
import logging import logging
from typing import Awaitable, Optional from typing import Optional
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
@ -35,11 +35,6 @@ class PluginMulticast(PluginBase):
"""Return latest version of Multicast.""" """Return latest version of Multicast."""
return self.sys_updater.version_multicast return self.sys_updater.version_multicast
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
async def load(self) -> None: async def load(self) -> None:
"""Load multicast setup.""" """Load multicast setup."""
# Check Multicast state # Check Multicast state
@ -143,13 +138,6 @@ class PluginMulticast(PluginBase):
_LOGGER.error("Can't stop Multicast plugin") _LOGGER.error("Can't stop Multicast plugin")
raise MulticastError() from err raise MulticastError() from err
def logs(self) -> Awaitable[bytes]:
"""Get Multicast docker logs.
Return Coroutine.
"""
return self.instance.logs()
async def stats(self) -> DockerStats: async def stats(self) -> DockerStats:
"""Return stats of Multicast.""" """Return stats of Multicast."""
try: try:
@ -157,20 +145,6 @@ class PluginMulticast(PluginBase):
except DockerError as err: except DockerError as err:
raise MulticastError() from err raise MulticastError() from err
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
def is_failed(self) -> Awaitable[bool]:
"""Return True if a Docker container is failed state.
Return a coroutine.
"""
return self.instance.is_failed()
async def repair(self) -> None: async def repair(self) -> None:
"""Repair Multicast plugin.""" """Repair Multicast plugin."""
if await self.instance.exists(): if await self.instance.exists():

View File

@ -6,7 +6,7 @@ import asyncio
from contextlib import suppress from contextlib import suppress
import logging import logging
import secrets import secrets
from typing import Awaitable, Optional from typing import Optional
import aiohttp import aiohttp
from awesomeversion import AwesomeVersion from awesomeversion import AwesomeVersion
@ -43,11 +43,6 @@ class PluginObserver(PluginBase):
"""Return an access token for the Observer API.""" """Return an access token for the Observer API."""
return self._data.get(ATTR_ACCESS_TOKEN) return self._data.get(ATTR_ACCESS_TOKEN)
@property
def in_progress(self) -> bool:
"""Return True if a task is in progress."""
return self.instance.in_progress
async def load(self) -> None: async def load(self) -> None:
"""Load observer setup.""" """Load observer setup."""
# Check observer state # Check observer state
@ -145,13 +140,6 @@ class PluginObserver(PluginBase):
except DockerError as err: except DockerError as err:
raise ObserverError() from err raise ObserverError() from err
def is_running(self) -> Awaitable[bool]:
"""Return True if Docker container is running.
Return a coroutine.
"""
return self.instance.is_running()
async def rebuild(self) -> None: async def rebuild(self) -> None:
"""Rebuild Observer Docker container.""" """Rebuild Observer Docker container."""
with suppress(DockerError): with suppress(DockerError):

View File

@ -0,0 +1,77 @@
"""Helpers to check plugin trust."""
import logging
from typing import List, Optional
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckPluginTrust(coresys)
class CheckPluginTrust(CheckBase):
"""CheckPluginTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
for plugin in (
self.sys_plugins.dns,
self.sys_plugins.audio,
self.sys_plugins.cli,
self.sys_plugins.observer,
self.sys_plugins.multicast,
):
try:
await plugin.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
self.sys_resolution.create_issue(
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
)
except CodeNotaryError:
pass
async def approve_check(self, reference: Optional[str] = None) -> bool:
"""Approve check if it is affected by issue."""
for plugin in (
self.sys_plugins.dns,
self.sys_plugins.audio,
self.sys_plugins.cli,
self.sys_plugins.observer,
self.sys_plugins.multicast,
):
if reference != plugin.slug:
continue
try:
await plugin.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.PLUGIN
@property
def states(self) -> List[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -1,4 +1,4 @@
"""Test Check Addon Pwned.""" """Test Check Core trust."""
# pylint: disable=import-error,protected-access # pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch

View File

@ -0,0 +1,120 @@
"""Test Check Plugin trust."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.plugin_trust import CheckPluginTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
plugin_trust = CheckPluginTrust(coresys)
assert plugin_trust.slug == "plugin_trust"
assert plugin_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
plugin_trust = CheckPluginTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.dns.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.cli.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.multicast.check_trust = AsyncMock(side_effect=CodeNotaryError)
coresys.plugins.observer.check_trust = AsyncMock(side_effect=CodeNotaryError)
await plugin_trust.run_check()
assert coresys.plugins.audio.check_trust.called
assert coresys.plugins.dns.check_trust.called
assert coresys.plugins.cli.check_trust.called
assert coresys.plugins.multicast.check_trust.called
assert coresys.plugins.observer.check_trust.called
coresys.plugins.audio.check_trust = AsyncMock(return_value=None)
coresys.plugins.dns.check_trust = AsyncMock(return_value=None)
coresys.plugins.cli.check_trust = AsyncMock(return_value=None)
coresys.plugins.multicast.check_trust = AsyncMock(return_value=None)
coresys.plugins.observer.check_trust = AsyncMock(return_value=None)
await plugin_trust.run_check()
assert coresys.plugins.audio.check_trust.called
assert coresys.plugins.dns.check_trust.called
assert coresys.plugins.cli.check_trust.called
assert coresys.plugins.multicast.check_trust.called
assert coresys.plugins.observer.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.dns.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.cli.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.multicast.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
coresys.plugins.observer.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await plugin_trust.run_check()
assert coresys.plugins.audio.check_trust.called
assert coresys.plugins.dns.check_trust.called
assert coresys.plugins.cli.check_trust.called
assert coresys.plugins.multicast.check_trust.called
assert coresys.plugins.observer.check_trust.called
assert len(coresys.resolution.issues) == 5
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
plugin_trust = CheckPluginTrust(coresys)
coresys.core.state = CoreState.RUNNING
coresys.plugins.audio.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await plugin_trust.approve_check(reference="audio")
coresys.plugins.audio.check_trust = AsyncMock(return_value=None)
assert not await plugin_trust.approve_check(reference="audio")
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
plugin_trust = CheckPluginTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await plugin_trust.run_check()
assert not coresys.security.verify_own_content.called
assert "Skipping plugin_trust, content_trust is globally disabled" in caplog.text
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
plugin_trust = CheckPluginTrust(coresys)
should_run = plugin_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.plugin_trust.CheckPluginTrust.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await plugin_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await plugin_trust()
check.assert_not_called()
check.reset_mock()

View File

@ -1,4 +1,4 @@
"""Test Check Addon Pwned.""" """Test Check Supervisor trust."""
# pylint: disable=import-error,protected-access # pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch