Create issue for detached addons (#5084)

* Create issue for detached addons

* Separate issues into missing and removed
This commit is contained in:
Mike Degatano 2024-05-23 03:36:59 -04:00 committed by GitHub
parent c4bc1e3824
commit f150a19c0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 273 additions and 0 deletions

View File

@ -0,0 +1,49 @@
"""Helpers to check for detached addons due to repo misisng."""
from ...const import CoreState
from ...coresys import CoreSys
from ..const import ContextType, IssueType
from .base import CheckBase
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckDetachedAddonMissing(coresys)
class CheckDetachedAddonMissing(CheckBase):
"""CheckDetachedAddonMissing class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
for addon in self.sys_addons.installed:
if (
addon.is_detached
and addon.repository not in self.sys_store.repositories
):
self.sys_resolution.create_issue(
IssueType.DETACHED_ADDON_MISSING,
ContextType.ADDON,
reference=addon.slug,
)
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
return (
addon := self.sys_addons.get(reference, local_only=True)
) and addon.is_detached
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.DETACHED_ADDON_MISSING
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.ADDON
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.SETUP]

View File

@ -0,0 +1,46 @@
"""Helpers to check for detached addons due to removal from repo."""
from ...const import CoreState
from ...coresys import CoreSys
from ..const import ContextType, IssueType
from .base import CheckBase
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckDetachedAddonRemoved(coresys)
class CheckDetachedAddonRemoved(CheckBase):
"""CheckDetachedAddonRemoved class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
for addon in self.sys_addons.installed:
if addon.is_detached and addon.repository in self.sys_store.repositories:
self.sys_resolution.create_issue(
IssueType.DETACHED_ADDON_REMOVED,
ContextType.ADDON,
reference=addon.slug,
)
async def approve_check(self, reference: str | None = None) -> bool:
"""Approve check if it is affected by issue."""
return (
addon := self.sys_addons.get(reference, local_only=True)
) and addon.is_detached
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.DETACHED_ADDON_REMOVED
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.ADDON
@property
def states(self) -> list[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.SETUP]

View File

@ -73,6 +73,8 @@ class IssueType(StrEnum):
CORRUPT_DOCKER = "corrupt_docker"
CORRUPT_REPOSITORY = "corrupt_repository"
CORRUPT_FILESYSTEM = "corrupt_filesystem"
DETACHED_ADDON_MISSING = "detached_addon_missing"
DETACHED_ADDON_REMOVED = "detached_addon_removed"
DISABLED_DATA_DISK = "disabled_data_disk"
DNS_LOOP = "dns_loop"
DNS_SERVER_FAILED = "dns_server_failed"

View File

@ -0,0 +1,85 @@
"""Test check for detached addons due to repo missing."""
from unittest.mock import patch
from supervisor.addons.addon import Addon
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.checks.detached_addon_missing import (
CheckDetachedAddonMissing,
)
from supervisor.resolution.const import ContextType, IssueType
async def test_base(coresys: CoreSys):
"""Test check basics."""
detached_addon_missing = CheckDetachedAddonMissing(coresys)
assert detached_addon_missing.slug == "detached_addon_missing"
assert detached_addon_missing.enabled
async def test_check(coresys: CoreSys, install_addon_ssh: Addon):
"""Test check for detached addons."""
detached_addon_missing = CheckDetachedAddonMissing(coresys)
coresys.core.state = CoreState.SETUP
await detached_addon_missing()
assert len(coresys.resolution.issues) == 0
# Mock test addon was been installed from a now non-existent store
install_addon_ssh.slug = "abc123_ssh"
coresys.addons.data.system["abc123_ssh"] = coresys.addons.data.system["local_ssh"]
coresys.addons.local["abc123_ssh"] = coresys.addons.local["local_ssh"]
install_addon_ssh.data["repository"] = "abc123"
await detached_addon_missing()
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[0].type is IssueType.DETACHED_ADDON_MISSING
assert coresys.resolution.issues[0].context is ContextType.ADDON
assert coresys.resolution.issues[0].reference == install_addon_ssh.slug
async def test_approve(coresys: CoreSys, install_addon_ssh: Addon):
"""Test approve existing detached addon issues."""
detached_addon_missing = CheckDetachedAddonMissing(coresys)
coresys.core.state = CoreState.SETUP
assert (
await detached_addon_missing.approve_check(reference=install_addon_ssh.slug)
is False
)
# Mock test addon was been installed from a now non-existent store
install_addon_ssh.slug = "abc123_ssh"
coresys.addons.data.system["abc123_ssh"] = coresys.addons.data.system["local_ssh"]
coresys.addons.local["abc123_ssh"] = coresys.addons.local["local_ssh"]
install_addon_ssh.data["repository"] = "abc123"
assert (
await detached_addon_missing.approve_check(reference=install_addon_ssh.slug)
is True
)
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
detached_addon_missing = CheckDetachedAddonMissing(coresys)
should_run = detached_addon_missing.states
should_not_run = [state for state in CoreState if state not in should_run]
assert should_run == [CoreState.SETUP]
assert len(should_not_run) != 0
with patch.object(
CheckDetachedAddonMissing, "run_check", return_value=None
) as check:
for state in should_run:
coresys.core.state = state
await detached_addon_missing()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await detached_addon_missing()
check.assert_not_called()
check.reset_mock()

View File

@ -0,0 +1,91 @@
"""Test check for detached addons due to removal from repo."""
from pathlib import Path
from unittest.mock import PropertyMock, patch
from supervisor.addons.addon import Addon
from supervisor.config import CoreConfig
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.resolution.checks.detached_addon_removed import (
CheckDetachedAddonRemoved,
)
from supervisor.resolution.const import ContextType, IssueType
async def test_base(coresys: CoreSys):
"""Test check basics."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys)
assert detached_addon_removed.slug == "detached_addon_removed"
assert detached_addon_removed.enabled
async def test_check(
coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path
):
"""Test check for detached addons."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys)
coresys.core.state = CoreState.SETUP
await detached_addon_removed()
assert len(coresys.resolution.issues) == 0
(addons_dir := tmp_supervisor_data / "addons" / "local").mkdir()
with patch.object(
CoreConfig, "path_addons_local", new=PropertyMock(return_value=addons_dir)
):
await coresys.store.load()
await detached_addon_removed()
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[0].type is IssueType.DETACHED_ADDON_REMOVED
assert coresys.resolution.issues[0].context is ContextType.ADDON
assert coresys.resolution.issues[0].reference == install_addon_ssh.slug
async def test_approve(
coresys: CoreSys, install_addon_ssh: Addon, tmp_supervisor_data: Path
):
"""Test approve existing detached addon issues."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys)
coresys.core.state = CoreState.SETUP
assert (
await detached_addon_removed.approve_check(reference=install_addon_ssh.slug)
is False
)
(addons_dir := tmp_supervisor_data / "addons" / "local").mkdir()
with patch.object(
CoreConfig, "path_addons_local", new=PropertyMock(return_value=addons_dir)
):
await coresys.store.load()
assert (
await detached_addon_removed.approve_check(reference=install_addon_ssh.slug)
is True
)
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys)
should_run = detached_addon_removed.states
should_not_run = [state for state in CoreState if state not in should_run]
assert should_run == [CoreState.SETUP]
assert len(should_not_run) != 0
with patch.object(
CheckDetachedAddonRemoved, "run_check", return_value=None
) as check:
for state in should_run:
coresys.core.state = state
await detached_addon_removed()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await detached_addon_removed()
check.assert_not_called()
check.reset_mock()