Use status 404 in more places when appropriate (#5480)

This commit is contained in:
Mike Degatano 2024-12-17 05:18:32 -05:00 committed by GitHub
parent de68868788
commit d8101ddba8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 374 additions and 139 deletions

View File

@ -106,6 +106,7 @@ from ..exceptions import (
APIAddonNotInstalled,
APIError,
APIForbidden,
APINotFound,
PwnedError,
PwnedSecret,
)
@ -161,7 +162,7 @@ class APIAddons(CoreSysAttributes):
addon = self.sys_addons.get(addon_slug)
if not addon:
raise APIError(f"Addon {addon_slug} does not exist")
raise APINotFound(f"Addon {addon_slug} does not exist")
if not isinstance(addon, Addon) or not addon.is_installed:
raise APIAddonNotInstalled("Addon is not installed")

View File

@ -16,7 +16,7 @@ from ..const import (
AddonState,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
from ..exceptions import APIForbidden, APINotFound
from .utils import api_process, api_validate, require_home_assistant
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -36,7 +36,7 @@ class APIDiscovery(CoreSysAttributes):
"""Extract discovery message from URL."""
message = self.sys_discovery.get(request.match_info.get("uuid"))
if not message:
raise APIError("Discovery message not found")
raise APINotFound("Discovery message not found")
return message
@api_process

View File

@ -16,6 +16,7 @@ from ..const import (
ATTR_VERSION,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APINotFound
from .utils import api_process, api_validate
_LOGGER: logging.Logger = logging.getLogger(__name__)
@ -58,6 +59,9 @@ class APIDocker(CoreSysAttributes):
async def remove_registry(self, request: web.Request):
"""Delete a docker registry."""
hostname = request.match_info.get(ATTR_HOSTNAME)
if hostname not in self.sys_docker.config.registries:
raise APINotFound(f"Hostname {hostname} does not exist in registries")
del self.sys_docker.config.registries[hostname]
self.sys_docker.config.save_data()

View File

@ -7,7 +7,7 @@ from aiohttp import web
import voluptuous as vol
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..exceptions import APIError, APINotFound, JobNotFound
from ..jobs import SupervisorJob
from ..jobs.const import ATTR_IGNORE_CONDITIONS, JobCondition
from .const import ATTR_JOBS
@ -23,6 +23,13 @@ SCHEMA_OPTIONS = vol.Schema(
class APIJobs(CoreSysAttributes):
"""Handle RESTful API for OS functions."""
def _extract_job(self, request: web.Request) -> SupervisorJob:
"""Extract job from request or raise."""
try:
return self.sys_jobs.get_job(request.match_info.get("uuid"))
except JobNotFound:
raise APINotFound("Job does not exist") from None
def _list_jobs(self, start: SupervisorJob | None = None) -> list[dict[str, Any]]:
"""Return current job tree."""
jobs_by_parent: dict[str | None, list[SupervisorJob]] = {}
@ -86,13 +93,13 @@ class APIJobs(CoreSysAttributes):
@api_process
async def job_info(self, request: web.Request) -> dict[str, Any]:
"""Get details of a job by ID."""
job = self.sys_jobs.get_job(request.match_info.get("uuid"))
job = self._extract_job(request)
return self._list_jobs(job)[0]
@api_process
async def remove_job(self, request: web.Request) -> None:
"""Remove a completed job."""
job = self.sys_jobs.get_job(request.match_info.get("uuid"))
job = self._extract_job(request)
if not job.done:
raise APIError(f"Job {job.uuid} is not done!")

View File

@ -7,7 +7,7 @@ import voluptuous as vol
from ..const import ATTR_NAME, ATTR_STATE
from ..coresys import CoreSysAttributes
from ..exceptions import APIError
from ..exceptions import APIError, APINotFound
from ..mounts.const import ATTR_DEFAULT_BACKUP_MOUNT, MountUsage
from ..mounts.mount import Mount
from ..mounts.validate import SCHEMA_MOUNT_CONFIG
@ -24,6 +24,13 @@ SCHEMA_OPTIONS = vol.Schema(
class APIMounts(CoreSysAttributes):
"""Handle REST API for mounting options."""
def _extract_mount(self, request: web.Request) -> Mount:
"""Extract mount from request or raise."""
name = request.match_info.get("mount")
if name not in self.sys_mounts:
raise APINotFound(f"No mount exists with name {name}")
return self.sys_mounts.get(name)
@api_process
async def info(self, request: web.Request) -> dict[str, Any]:
"""Return MountManager info."""
@ -85,15 +92,13 @@ class APIMounts(CoreSysAttributes):
@api_process
async def update_mount(self, request: web.Request) -> None:
"""Update an existing mount in supervisor."""
name = request.match_info.get("mount")
current = self._extract_mount(request)
name_schema = vol.Schema(
{vol.Optional(ATTR_NAME, default=name): name}, extra=vol.ALLOW_EXTRA
{vol.Optional(ATTR_NAME, default=current.name): current.name},
extra=vol.ALLOW_EXTRA,
)
body = await api_validate(vol.All(name_schema, SCHEMA_MOUNT_CONFIG), request)
if name not in self.sys_mounts:
raise APIError(f"No mount exists with name {name}")
mount = Mount.from_dict(self.coresys, body)
await self.sys_mounts.create_mount(mount)
@ -110,8 +115,8 @@ class APIMounts(CoreSysAttributes):
@api_process
async def delete_mount(self, request: web.Request) -> None:
"""Delete an existing mount in supervisor."""
name = request.match_info.get("mount")
mount = await self.sys_mounts.remove_mount(name)
current = self._extract_mount(request)
mount = await self.sys_mounts.remove_mount(current.name)
# If it was a backup mount, reload backups
if mount.usage == MountUsage.BACKUP:
@ -122,9 +127,9 @@ class APIMounts(CoreSysAttributes):
@api_process
async def reload_mount(self, request: web.Request) -> None:
"""Reload an existing mount in supervisor."""
name = request.match_info.get("mount")
await self.sys_mounts.reload_mount(name)
mount = self._extract_mount(request)
await self.sys_mounts.reload_mount(mount.name)
# If it's a backup mount, reload backups
if self.sys_mounts.get(name).usage == MountUsage.BACKUP:
if mount.usage == MountUsage.BACKUP:
self.sys_create_task(self.sys_backups.reload())

View File

@ -42,7 +42,7 @@ from ..const import (
DOCKER_NETWORK_MASK,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, HostNetworkNotFound
from ..exceptions import APIError, APINotFound, HostNetworkNotFound
from ..host.configuration import (
AccessPoint,
Interface,
@ -167,7 +167,7 @@ class APINetwork(CoreSysAttributes):
except HostNetworkNotFound:
pass
raise APIError(f"Interface {name} does not exist") from None
raise APINotFound(f"Interface {name} does not exist") from None
@api_process
async def info(self, request: web.Request) -> dict[str, Any]:

View File

@ -19,8 +19,9 @@ from ..const import (
ATTR_UNSUPPORTED,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, ResolutionNotFound
from ..resolution.data import Suggestion
from ..exceptions import APINotFound, ResolutionNotFound
from ..resolution.checks.base import CheckBase
from ..resolution.data import Issue, Suggestion
from .utils import api_process, api_validate
SCHEMA_CHECK_OPTIONS = vol.Schema({vol.Optional(ATTR_ENABLED): bool})
@ -29,6 +30,29 @@ SCHEMA_CHECK_OPTIONS = vol.Schema({vol.Optional(ATTR_ENABLED): bool})
class APIResoulution(CoreSysAttributes):
"""Handle REST API for resoulution."""
def _extract_issue(self, request: web.Request) -> Issue:
"""Extract issue from request or raise."""
try:
return self.sys_resolution.get_issue(request.match_info.get("issue"))
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid issue") from None
def _extract_suggestion(self, request: web.Request) -> Suggestion:
"""Extract suggestion from request or raise."""
try:
return self.sys_resolution.get_suggestion(
request.match_info.get("suggestion")
)
except ResolutionNotFound:
raise APINotFound("The supplied UUID is not a valid suggestion") from None
def _extract_check(self, request: web.Request) -> CheckBase:
"""Extract check from request or raise."""
try:
return self.sys_resolution.check.get(request.match_info.get("check"))
except ResolutionNotFound:
raise APINotFound("The supplied check slug is not available") from None
def _generate_suggestion_information(self, suggestion: Suggestion):
"""Generate suggestion information for response."""
resp = attr.asdict(suggestion)
@ -61,47 +85,31 @@ class APIResoulution(CoreSysAttributes):
@api_process
async def apply_suggestion(self, request: web.Request) -> None:
"""Apply suggestion."""
try:
suggestion = self.sys_resolution.get_suggestion(
request.match_info.get("suggestion")
)
await self.sys_resolution.apply_suggestion(suggestion)
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid suggestion") from None
suggestion = self._extract_suggestion(request)
await self.sys_resolution.apply_suggestion(suggestion)
@api_process
async def dismiss_suggestion(self, request: web.Request) -> None:
"""Dismiss suggestion."""
try:
suggestion = self.sys_resolution.get_suggestion(
request.match_info.get("suggestion")
)
self.sys_resolution.dismiss_suggestion(suggestion)
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid suggestion") from None
suggestion = self._extract_suggestion(request)
self.sys_resolution.dismiss_suggestion(suggestion)
@api_process
async def suggestions_for_issue(self, request: web.Request) -> dict[str, Any]:
"""Return suggestions that fix an issue."""
try:
issue = self.sys_resolution.get_issue(request.match_info.get("issue"))
return {
ATTR_SUGGESTIONS: [
self._generate_suggestion_information(suggestion)
for suggestion in self.sys_resolution.suggestions_for_issue(issue)
]
}
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid issue") from None
issue = self._extract_issue(request)
return {
ATTR_SUGGESTIONS: [
self._generate_suggestion_information(suggestion)
for suggestion in self.sys_resolution.suggestions_for_issue(issue)
]
}
@api_process
async def dismiss_issue(self, request: web.Request) -> None:
"""Dismiss issue."""
try:
issue = self.sys_resolution.get_issue(request.match_info.get("issue"))
self.sys_resolution.dismiss_issue(issue)
except ResolutionNotFound:
raise APIError("The supplied UUID is not a valid issue") from None
issue = self._extract_issue(request)
self.sys_resolution.dismiss_issue(issue)
@api_process
def healthcheck(self, request: web.Request) -> Awaitable[None]:
@ -112,11 +120,7 @@ class APIResoulution(CoreSysAttributes):
async def options_check(self, request: web.Request) -> None:
"""Set options for check."""
body = await api_validate(SCHEMA_CHECK_OPTIONS, request)
try:
check = self.sys_resolution.check.get(request.match_info.get("check"))
except ResolutionNotFound:
raise APIError("The supplied check slug is not available") from None
check = self._extract_check(request)
# Apply options
if ATTR_ENABLED in body:
@ -127,9 +131,5 @@ class APIResoulution(CoreSysAttributes):
@api_process
async def run_check(self, request: web.Request) -> None:
"""Execute a backend check."""
try:
check = self.sys_resolution.check.get(request.match_info.get("check"))
except ResolutionNotFound:
raise APIError("The supplied check slug is not available") from None
check = self._extract_check(request)
await check()

View File

@ -9,7 +9,7 @@ from ..const import (
REQUEST_FROM,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
from ..exceptions import APIError, APIForbidden, APINotFound
from .utils import api_process, api_validate
@ -20,7 +20,7 @@ class APIServices(CoreSysAttributes):
"""Return service, throw an exception if it doesn't exist."""
service = self.sys_services.get(request.match_info.get("service"))
if not service:
raise APIError("Service does not exist")
raise APINotFound("Service does not exist")
return service

View File

@ -51,7 +51,7 @@ from ..const import (
REQUEST_FROM,
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden
from ..exceptions import APIError, APIForbidden, APINotFound
from ..store.addon import AddonStore
from ..store.repository import Repository
from ..store.validate import validate_repository
@ -74,19 +74,17 @@ class APIStore(CoreSysAttributes):
def _extract_addon(self, request: web.Request, installed=False) -> AnyAddon:
"""Return add-on, throw an exception it it doesn't exist."""
addon_slug: str = request.match_info.get("addon")
addon_version: str = request.match_info.get("version", "latest")
if installed:
addon = self.sys_addons.local.get(addon_slug)
if addon is None or not addon.is_installed:
raise APIError(f"Addon {addon_slug} is not installed")
else:
addon = self.sys_addons.store.get(addon_slug)
if not (addon := self.sys_addons.get(addon_slug)):
raise APINotFound(f"Addon {addon_slug} does not exist")
if not addon:
raise APIError(
f"Addon {addon_slug} with version {addon_version} does not exist in the store"
)
if installed and not addon.is_installed:
raise APIError(f"Addon {addon_slug} is not installed")
if not installed and addon.is_installed:
if not addon.addon_store:
raise APINotFound(f"Addon {addon_slug} does not exist in the store")
return addon.addon_store
return addon
@ -94,11 +92,12 @@ class APIStore(CoreSysAttributes):
"""Return repository, throw an exception it it doesn't exist."""
repository_slug: str = request.match_info.get("repository")
repository = self.sys_store.get(repository_slug)
if not repository:
raise APIError(f"Repository {repository_slug} does not exist in the store")
if repository_slug not in self.sys_store.repositories:
raise APINotFound(
f"Repository {repository_slug} does not exist in the store"
)
return repository
return self.sys_store.get(repository_slug)
def _generate_addon_information(
self, addon: AddonStore, extended: bool = False

View File

@ -3,6 +3,7 @@
import asyncio
from unittest.mock import MagicMock, PropertyMock, patch
from aiohttp import ClientResponse
from aiohttp.test_utils import TestClient
import pytest
@ -82,7 +83,7 @@ async def test_api_addon_logs_not_installed(api_client: TestClient):
"""Test error is returned for non-existing add-on."""
resp = await api_client.get("/addons/hic_sunt_leones/logs")
assert resp.status == 400
assert resp.status == 404
assert resp.content_type == "text/plain"
content = await resp.text()
assert content == "Addon hic_sunt_leones does not exist"
@ -366,3 +367,71 @@ async def test_addon_options_boot_mode_manual_only_invalid(
body["message"]
== "Addon local_example boot option is set to manual_only so it cannot be changed"
)
async def get_message(resp: ClientResponse, json_expected: bool) -> str:
"""Get message from response based on response type."""
if json_expected:
body = await resp.json()
return body["message"]
return await resp.text()
@pytest.mark.parametrize(
("method", "url", "json_expected"),
[
("get", "/addons/bad/info", True),
("post", "/addons/bad/uninstall", True),
("post", "/addons/bad/start", True),
("post", "/addons/bad/stop", True),
("post", "/addons/bad/restart", True),
("post", "/addons/bad/options", True),
("post", "/addons/bad/sys_options", True),
("post", "/addons/bad/options/validate", True),
("post", "/addons/bad/rebuild", True),
("post", "/addons/bad/stdin", True),
("post", "/addons/bad/security", True),
("get", "/addons/bad/stats", True),
("get", "/addons/bad/logs", False),
("get", "/addons/bad/logs/follow", False),
("get", "/addons/bad/logs/boots/1", False),
("get", "/addons/bad/logs/boots/1/follow", False),
],
)
async def test_addon_not_found(
api_client: TestClient, method: str, url: str, json_expected: bool
):
"""Test addon not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
assert await get_message(resp, json_expected) == "Addon bad does not exist"
@pytest.mark.parametrize(
("method", "url", "json_expected"),
[
("post", "/addons/local_ssh/uninstall", True),
("post", "/addons/local_ssh/start", True),
("post", "/addons/local_ssh/stop", True),
("post", "/addons/local_ssh/restart", True),
("post", "/addons/local_ssh/options", True),
("post", "/addons/local_ssh/sys_options", True),
("post", "/addons/local_ssh/options/validate", True),
("post", "/addons/local_ssh/rebuild", True),
("post", "/addons/local_ssh/stdin", True),
("post", "/addons/local_ssh/security", True),
("get", "/addons/local_ssh/stats", True),
("get", "/addons/local_ssh/logs", False),
("get", "/addons/local_ssh/logs/follow", False),
("get", "/addons/local_ssh/logs/boots/1", False),
("get", "/addons/local_ssh/logs/boots/1/follow", False),
],
)
@pytest.mark.usefixtures("repository")
async def test_addon_not_installed(
api_client: TestClient, method: str, url: str, json_expected: bool
):
"""Test addon not installed error."""
resp = await api_client.request(method, url)
assert resp.status == 400
assert await get_message(resp, json_expected) == "Addon is not installed"

View File

@ -138,3 +138,15 @@ async def test_api_invalid_discovery(api_client: TestClient, install_addon_ssh:
resp = await api_client.post("/discovery", json={"service": "test", "config": None})
assert resp.status == 400
@pytest.mark.parametrize(
("method", "url"),
[("get", "/discovery/bad"), ("delete", "/discovery/bad")],
)
async def test_discovery_not_found(api_client: TestClient, method: str, url: str):
"""Test discovery not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
resp = await resp.json()
assert resp["message"] == "Discovery message not found"

View File

@ -1,10 +1,11 @@
"""Test Docker API."""
from aiohttp.test_utils import TestClient
import pytest
@pytest.mark.asyncio
async def test_api_docker_info(api_client):
async def test_api_docker_info(api_client: TestClient):
"""Test docker info api."""
resp = await api_client.get("/docker/info")
result = await resp.json()
@ -12,3 +13,11 @@ async def test_api_docker_info(api_client):
assert result["data"]["logging"] == "journald"
assert result["data"]["storage"] == "overlay2"
assert result["data"]["version"] == "1.0.0"
async def test_registry_not_found(api_client: TestClient):
"""Test registry not found error."""
resp = await api_client.delete("/docker/registries/bad")
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Hostname bad does not exist in registries"

View File

@ -4,6 +4,7 @@ import asyncio
from unittest.mock import ANY
from aiohttp.test_utils import TestClient
import pytest
from supervisor.coresys import CoreSys
from supervisor.jobs.const import ATTR_IGNORE_CONDITIONS, JobCondition
@ -213,6 +214,18 @@ async def test_job_manual_cleanup(api_client: TestClient, coresys: CoreSys):
# Confirm it no longer exists
resp = await api_client.get(f"/jobs/{test.job_id}")
assert resp.status == 400
assert resp.status == 404
result = await resp.json()
assert result["message"] == f"No job found with id {test.job_id}"
assert result["message"] == "Job does not exist"
@pytest.mark.parametrize(
("method", "url"),
[("get", "/jobs/bad"), ("delete", "/jobs/bad")],
)
async def test_job_not_found(api_client: TestClient, method: str, url: str):
"""Test job not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Job does not exist"

View File

@ -264,25 +264,6 @@ async def test_api_update_mount(
coresys.mounts.save_data.assert_called_once()
async def test_api_update_error_mount_missing(
api_client: TestClient, mount_propagation
):
"""Test update mount API errors when mount does not exist."""
resp = await api_client.put(
"/mounts/backup_test",
json={
"type": "cifs",
"usage": "backup",
"server": "backup.local",
"share": "new_backups",
},
)
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert result["message"] == "No mount exists with name backup_test"
async def test_api_update_dbus_error_mount_remains(
api_client: TestClient,
all_dbus_services: dict[str, DBusServiceMock],
@ -399,20 +380,6 @@ async def test_api_reload_mount(
]
async def test_api_reload_error_mount_missing(
api_client: TestClient, mount_propagation
):
"""Test reload mount API errors when mount does not exist."""
resp = await api_client.post("/mounts/backup_test/reload")
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert (
result["message"]
== "Cannot reload 'backup_test', no mount exists with that name"
)
async def test_api_delete_mount(
api_client: TestClient,
coresys: CoreSys,
@ -435,20 +402,6 @@ async def test_api_delete_mount(
coresys.mounts.save_data.assert_called_once()
async def test_api_delete_error_mount_missing(
api_client: TestClient, mount_propagation
):
"""Test delete mount API errors when mount does not exist."""
resp = await api_client.delete("/mounts/backup_test")
assert resp.status == 400
result = await resp.json()
assert result["result"] == "error"
assert (
result["message"]
== "Cannot remove 'backup_test', no mount exists with that name"
)
async def test_api_create_backup_mount_sets_default(
api_client: TestClient,
coresys: CoreSys,
@ -903,3 +856,19 @@ async def test_api_read_only_backup_mount_invalid(
result = await resp.json()
assert result["result"] == "error"
assert "Backup mounts cannot be read only" in result["message"]
@pytest.mark.parametrize(
("method", "url"),
[
("put", "/mounts/bad"),
("delete", "/mounts/bad"),
("post", "/mounts/bad/reload"),
],
)
async def test_mount_not_found(api_client: TestClient, method: str, url: str):
"""Test mount not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
resp = await resp.json()
assert resp["message"] == "No mount exists with name bad"

View File

@ -400,3 +400,22 @@ async def test_api_network_vlan(
"id": Variant("u", 1),
"parent": Variant("s", "0c23631e-2118-355c-bbb0-8943229cb0d6"),
}
@pytest.mark.parametrize(
("method", "url"),
[
("get", "/network/interface/bad/info"),
("post", "/network/interface/bad/update"),
("get", "/network/interface/bad/accesspoints"),
("post", "/network/interface/bad/vlan/1"),
],
)
async def test_network_interface_not_found(
api_client: TestClient, method: str, url: str
):
"""Test network interface not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Interface bad does not exist"

View File

@ -2,6 +2,7 @@
from unittest.mock import AsyncMock
from aiohttp.test_utils import TestClient
import pytest
from supervisor.const import (
@ -24,7 +25,7 @@ from supervisor.resolution.data import Issue, Suggestion
@pytest.mark.asyncio
async def test_api_resolution_base(coresys: CoreSys, api_client):
async def test_api_resolution_base(coresys: CoreSys, api_client: TestClient):
"""Test resolution manager api."""
coresys.resolution.unsupported = UnsupportedReason.OS
coresys.resolution.suggestions = Suggestion(
@ -42,7 +43,9 @@ async def test_api_resolution_base(coresys: CoreSys, api_client):
@pytest.mark.asyncio
async def test_api_resolution_dismiss_suggestion(coresys: CoreSys, api_client):
async def test_api_resolution_dismiss_suggestion(
coresys: CoreSys, api_client: TestClient
):
"""Test resolution manager suggestion apply api."""
coresys.resolution.suggestions = clear_backup = Suggestion(
SuggestionType.CLEAR_FULL_BACKUP, ContextType.SYSTEM
@ -54,7 +57,9 @@ async def test_api_resolution_dismiss_suggestion(coresys: CoreSys, api_client):
@pytest.mark.asyncio
async def test_api_resolution_apply_suggestion(coresys: CoreSys, api_client):
async def test_api_resolution_apply_suggestion(
coresys: CoreSys, api_client: TestClient
):
"""Test resolution manager suggestion apply api."""
coresys.resolution.suggestions = clear_backup = Suggestion(
SuggestionType.CLEAR_FULL_BACKUP, ContextType.SYSTEM
@ -82,7 +87,7 @@ async def test_api_resolution_apply_suggestion(coresys: CoreSys, api_client):
@pytest.mark.asyncio
async def test_api_resolution_dismiss_issue(coresys: CoreSys, api_client):
async def test_api_resolution_dismiss_issue(coresys: CoreSys, api_client: TestClient):
"""Test resolution manager issue apply api."""
coresys.resolution.issues = updated_failed = Issue(
IssueType.UPDATE_FAILED, ContextType.SYSTEM
@ -94,7 +99,7 @@ async def test_api_resolution_dismiss_issue(coresys: CoreSys, api_client):
@pytest.mark.asyncio
async def test_api_resolution_unhealthy(coresys: CoreSys, api_client):
async def test_api_resolution_unhealthy(coresys: CoreSys, api_client: TestClient):
"""Test resolution manager api."""
coresys.resolution.unhealthy = UnhealthyReason.DOCKER
@ -104,7 +109,7 @@ async def test_api_resolution_unhealthy(coresys: CoreSys, api_client):
@pytest.mark.asyncio
async def test_api_resolution_check_options(coresys: CoreSys, api_client):
async def test_api_resolution_check_options(coresys: CoreSys, api_client: TestClient):
"""Test client API with checks options."""
free_space = coresys.resolution.check.get("free_space")
@ -121,7 +126,7 @@ async def test_api_resolution_check_options(coresys: CoreSys, api_client):
@pytest.mark.asyncio
async def test_api_resolution_check_run(coresys: CoreSys, api_client):
async def test_api_resolution_check_run(coresys: CoreSys, api_client: TestClient):
"""Test client API with run check."""
coresys.core.state = CoreState.RUNNING
free_space = coresys.resolution.check.get("free_space")
@ -133,7 +138,9 @@ async def test_api_resolution_check_run(coresys: CoreSys, api_client):
assert free_space.run_check.called
async def test_api_resolution_suggestions_for_issue(coresys: CoreSys, api_client):
async def test_api_resolution_suggestions_for_issue(
coresys: CoreSys, api_client: TestClient
):
"""Test getting suggestions that fix an issue."""
coresys.resolution.issues = corrupt_repo = Issue(
IssueType.CORRUPT_REPOSITORY, ContextType.STORE, "repo_1"
@ -165,3 +172,39 @@ async def test_api_resolution_suggestions_for_issue(coresys: CoreSys, api_client
]
assert len(suggestion) == 1
assert suggestion[0]["auto"] is False
@pytest.mark.parametrize(
("method", "url"),
[("delete", "/resolution/issue/bad"), ("get", "/resolution/issue/bad/suggestions")],
)
async def test_issue_not_found(api_client: TestClient, method: str, url: str):
"""Test issue not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "The supplied UUID is not a valid issue"
@pytest.mark.parametrize(
("method", "url"),
[("delete", "/resolution/suggestion/bad"), ("post", "/resolution/suggestion/bad")],
)
async def test_suggestion_not_found(api_client: TestClient, method: str, url: str):
"""Test suggestion not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "The supplied UUID is not a valid suggestion"
@pytest.mark.parametrize(
("method", "url"),
[("post", "/resolution/check/bad/options"), ("post", "/resolution/check/bad/run")],
)
async def test_check_not_found(api_client: TestClient, method: str, url: str):
"""Test check not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "The supplied check slug is not available"

View File

@ -0,0 +1,16 @@
"""Test services API."""
from aiohttp.test_utils import TestClient
import pytest
@pytest.mark.parametrize(
("method", "url"),
[("get", "/services/bad"), ("post", "/services/bad"), ("delete", "/services/bad")],
)
async def test_service_not_found(api_client: TestClient, method: str, url: str):
"""Test service not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Service does not exist"

View File

@ -4,6 +4,7 @@ import asyncio
from pathlib import Path
from unittest.mock import MagicMock, PropertyMock, patch
from aiohttp import ClientResponse
from aiohttp.test_utils import TestClient
import pytest
@ -235,7 +236,7 @@ async def test_api_detached_addon_changelog(
resp = await api_client.get(f"/{resource}/{install_addon_ssh.slug}/changelog")
assert resp.status == 200
result = await resp.text()
assert result == "Addon local_ssh with version latest does not exist in the store"
assert result == "Addon local_ssh does not exist in the store"
@pytest.mark.parametrize("resource", ["store/addons", "addons"])
@ -279,4 +280,72 @@ async def test_api_detached_addon_documentation(
resp = await api_client.get(f"/{resource}/{install_addon_ssh.slug}/documentation")
assert resp.status == 200
result = await resp.text()
assert result == "Addon local_ssh with version latest does not exist in the store"
assert result == "Addon local_ssh does not exist in the store"
async def get_message(resp: ClientResponse, json_expected: bool) -> str:
"""Get message from response based on response type."""
if json_expected:
body = await resp.json()
return body["message"]
return await resp.text()
@pytest.mark.parametrize(
("method", "url", "json_expected"),
[
("get", "/store/addons/bad", True),
("get", "/store/addons/bad/1", True),
("get", "/store/addons/bad/icon", False),
("get", "/store/addons/bad/logo", False),
("post", "/store/addons/bad/install", True),
("post", "/store/addons/bad/install/1", True),
("post", "/store/addons/bad/update", True),
("post", "/store/addons/bad/update/1", True),
# Legacy paths
("get", "/addons/bad/icon", False),
("get", "/addons/bad/logo", False),
("post", "/addons/bad/install", True),
("post", "/addons/bad/update", True),
],
)
async def test_store_addon_not_found(
api_client: TestClient, method: str, url: str, json_expected: bool
):
"""Test store addon not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
assert await get_message(resp, json_expected) == "Addon bad does not exist"
@pytest.mark.parametrize(
("method", "url"),
[
("post", "/store/addons/local_ssh/update"),
("post", "/store/addons/local_ssh/update/1"),
# Legacy paths
("post", "/addons/local_ssh/update"),
],
)
@pytest.mark.usefixtures("repository")
async def test_store_addon_not_installed(api_client: TestClient, method: str, url: str):
"""Test store addon not installed error."""
resp = await api_client.request(method, url)
assert resp.status == 400
body = await resp.json()
assert body["message"] == "Addon local_ssh is not installed"
@pytest.mark.parametrize(
("method", "url"),
[
("get", "/store/repositories/bad"),
("delete", "/store/repositories/bad"),
],
)
async def test_repository_not_found(api_client: TestClient, method: str, url: str):
"""Test repository not found error."""
resp = await api_client.request(method, url)
assert resp.status == 404
body = await resp.json()
assert body["message"] == "Repository bad does not exist in the store"