Compare commits

..

2 Commits

Author SHA1 Message Date
Mike Degatano
6d0f133616 Refactor to use common background_task utility in backups too 2025-08-27 20:17:57 +00:00
Mike Degatano
971abcf7f5 Add background option to update/install APIs 2025-08-27 03:02:08 +00:00
9 changed files with 337 additions and 206 deletions

View File

@@ -17,13 +17,13 @@ faust-cchardet==2.1.19
gitpython==3.1.45
jinja2==3.1.6
log-rate-limit==1.4.2
orjson==3.11.3
orjson==3.11.2
pulsectl==24.12.0
pyudev==0.24.3
PyYAML==6.0.2
requests==2.32.5
securetar==2025.2.1
sentry-sdk==2.35.1
sentry-sdk==2.35.0
setuptools==80.9.0
voluptuous==0.15.2
dbus-fast==2.44.3

View File

@@ -735,10 +735,6 @@ class RestAPI(CoreSysAttributes):
"/store/addons/{addon}/documentation",
api_store.addons_addon_documentation,
),
web.get(
"/store/addons/{addon}/availability",
api_store.addons_addon_availability,
),
web.post(
"/store/addons/{addon}/install", api_store.addons_addon_install
),

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
import errno
from io import IOBase
import logging
@@ -51,7 +50,6 @@ from ..const import (
)
from ..coresys import CoreSysAttributes
from ..exceptions import APIError, APIForbidden, APINotFound
from ..jobs import JobSchedulerOptions, SupervisorJob
from ..mounts.const import MountUsage
from ..resolution.const import UnhealthyReason
from .const import (
@@ -61,7 +59,7 @@ from .const import (
ATTR_LOCATIONS,
CONTENT_TYPE_TAR,
)
from .utils import api_process, api_validate
from .utils import api_process, api_validate, background_task
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -76,6 +74,8 @@ RE_BACKUP_FILENAME = re.compile(r"^[^\\\/]+\.tar$")
# Remove: 2022.08
_ALL_FOLDERS = ALL_FOLDERS + [FOLDER_HOMEASSISTANT]
_freeze_state_filter = lambda new_state: new_state == CoreState.FREEZE
def _ensure_list(item: Any) -> list:
"""Ensure value is a list."""
@@ -289,41 +289,6 @@ class APIBackups(CoreSysAttributes):
f"Location {LOCATION_CLOUD_BACKUP} is only available for Home Assistant"
)
async def _background_backup_task(
self, backup_method: Callable, *args, **kwargs
) -> tuple[asyncio.Task, str]:
"""Start backup task in background and return task and job ID."""
event = asyncio.Event()
job, backup_task = cast(
tuple[SupervisorJob, asyncio.Task],
self.sys_jobs.schedule_job(
backup_method, JobSchedulerOptions(), *args, **kwargs
),
)
async def release_on_freeze(new_state: CoreState):
if new_state == CoreState.FREEZE:
event.set()
# Wait for system to get into freeze state before returning
# If the backup fails validation it will raise before getting there
listener = self.sys_bus.register_event(
BusEvent.SUPERVISOR_STATE_CHANGE, release_on_freeze
)
try:
event_task = self.sys_create_task(event.wait())
_, pending = await asyncio.wait(
(backup_task, event_task),
return_when=asyncio.FIRST_COMPLETED,
)
# It seems backup returned early (error or something), make sure to cancel
# the event task to avoid "Task was destroyed but it is pending!" errors.
if event_task in pending:
event_task.cancel()
return (backup_task, job.uuid)
finally:
self.sys_bus.remove_listener(listener)
@api_process
async def backup_full(self, request: web.Request):
"""Create full backup."""
@@ -342,8 +307,12 @@ class APIBackups(CoreSysAttributes):
body[ATTR_ADDITIONAL_LOCATIONS] = locations
background = body.pop(ATTR_BACKGROUND)
backup_task, job_id = await self._background_backup_task(
self.sys_backups.do_backup_full, **body
backup_task, job_id = await background_task(
self,
self.sys_backups.do_backup_full,
bus_event=BusEvent.SUPERVISOR_STATE_CHANGE,
event_filter=_freeze_state_filter,
**body,
)
if background and not backup_task.done():
@@ -378,8 +347,12 @@ class APIBackups(CoreSysAttributes):
body[ATTR_ADDONS] = list(self.sys_addons.local)
background = body.pop(ATTR_BACKGROUND)
backup_task, job_id = await self._background_backup_task(
self.sys_backups.do_backup_partial, **body
backup_task, job_id = await background_task(
self,
self.sys_backups.do_backup_partial,
bus_event=BusEvent.SUPERVISOR_STATE_CHANGE,
event_filter=_freeze_state_filter,
**body,
)
if background and not backup_task.done():
@@ -402,8 +375,13 @@ class APIBackups(CoreSysAttributes):
request, body.get(ATTR_LOCATION, backup.location)
)
background = body.pop(ATTR_BACKGROUND)
restore_task, job_id = await self._background_backup_task(
self.sys_backups.do_restore_full, backup, **body
restore_task, job_id = await background_task(
self,
self.sys_backups.do_restore_full,
backup,
bus_event=BusEvent.SUPERVISOR_STATE_CHANGE,
event_filter=_freeze_state_filter,
**body,
)
if background and not restore_task.done() or await restore_task:
@@ -422,8 +400,13 @@ class APIBackups(CoreSysAttributes):
request, body.get(ATTR_LOCATION, backup.location)
)
background = body.pop(ATTR_BACKGROUND)
restore_task, job_id = await self._background_backup_task(
self.sys_backups.do_restore_partial, backup, **body
restore_task, job_id = await background_task(
self,
self.sys_backups.do_restore_partial,
backup,
bus_event=BusEvent.SUPERVISOR_STATE_CHANGE,
event_filter=_freeze_state_filter,
**body,
)
if background and not restore_task.done() or await restore_task:

View File

@@ -20,6 +20,7 @@ from ..const import (
ATTR_CPU_PERCENT,
ATTR_IMAGE,
ATTR_IP_ADDRESS,
ATTR_JOB_ID,
ATTR_MACHINE,
ATTR_MEMORY_LIMIT,
ATTR_MEMORY_PERCENT,
@@ -37,8 +38,8 @@ from ..const import (
from ..coresys import CoreSysAttributes
from ..exceptions import APIDBMigrationInProgress, APIError
from ..validate import docker_image, network_port, version_tag
from .const import ATTR_FORCE, ATTR_SAFE_MODE
from .utils import api_process, api_validate
from .const import ATTR_BACKGROUND, ATTR_FORCE, ATTR_SAFE_MODE
from .utils import api_process, api_validate, background_task
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -61,6 +62,7 @@ SCHEMA_UPDATE = vol.Schema(
{
vol.Optional(ATTR_VERSION): version_tag,
vol.Optional(ATTR_BACKUP): bool,
vol.Optional(ATTR_BACKGROUND, default=False): bool,
}
)
@@ -170,18 +172,25 @@ class APIHomeAssistant(CoreSysAttributes):
}
@api_process
async def update(self, request: web.Request) -> None:
async def update(self, request: web.Request) -> dict[str, str] | None:
"""Update Home Assistant."""
body = await api_validate(SCHEMA_UPDATE, request)
await self._check_offline_migration()
await asyncio.shield(
self.sys_homeassistant.core.update(
version=body.get(ATTR_VERSION, self.sys_homeassistant.latest_version),
backup=body.get(ATTR_BACKUP),
)
background = body[ATTR_BACKGROUND]
update_task, job_id = await background_task(
self,
self.sys_homeassistant.core.update,
job_names={"docker_interface_update", "backup_manager_partial_backup"},
version=body.get(ATTR_VERSION, self.sys_homeassistant.latest_version),
backup=body.get(ATTR_BACKUP),
)
if background and not update_task.done():
return {ATTR_JOB_ID: job_id}
return await update_task
@api_process
async def stop(self, request: web.Request) -> Awaitable[None]:
"""Stop Home Assistant."""

View File

@@ -199,25 +199,21 @@ class APIIngress(CoreSysAttributes):
url = f"{url}?{request.query_string}"
# Start proxy
try:
_LOGGER.debug("Proxing WebSocket to %s, upstream url: %s", addon.slug, url)
async with self.sys_websession.ws_connect(
url,
headers=source_header,
protocols=req_protocols,
autoclose=False,
autoping=False,
) as ws_client:
# Proxy requests
await asyncio.wait(
[
self.sys_create_task(_websocket_forward(ws_server, ws_client)),
self.sys_create_task(_websocket_forward(ws_client, ws_server)),
],
return_when=asyncio.FIRST_COMPLETED,
)
except TimeoutError:
_LOGGER.warning("WebSocket proxy to %s timed out", addon.slug)
async with self.sys_websession.ws_connect(
url,
headers=source_header,
protocols=req_protocols,
autoclose=False,
autoping=False,
) as ws_client:
# Proxy requests
await asyncio.wait(
[
self.sys_create_task(_websocket_forward(ws_server, ws_client)),
self.sys_create_task(_websocket_forward(ws_client, ws_server)),
],
return_when=asyncio.FIRST_COMPLETED,
)
return ws_server
@@ -290,7 +286,6 @@ class APIIngress(CoreSysAttributes):
aiohttp.ClientError,
aiohttp.ClientPayloadError,
ConnectionResetError,
ConnectionError,
) as err:
_LOGGER.error("Stream error with %s: %s", url, err)

View File

@@ -1,7 +1,6 @@
"""Init file for Supervisor Home Assistant RESTful API."""
import asyncio
from collections.abc import Awaitable
from pathlib import Path
from typing import Any, cast
@@ -36,6 +35,7 @@ from ..const import (
ATTR_ICON,
ATTR_INGRESS,
ATTR_INSTALLED,
ATTR_JOB_ID,
ATTR_LOGO,
ATTR_LONG_DESCRIPTION,
ATTR_MAINTAINER,
@@ -57,11 +57,13 @@ from ..exceptions import APIError, APIForbidden, APINotFound
from ..store.addon import AddonStore
from ..store.repository import Repository
from ..store.validate import validate_repository
from .const import CONTENT_TYPE_PNG, CONTENT_TYPE_TEXT
from .const import ATTR_BACKGROUND, CONTENT_TYPE_PNG, CONTENT_TYPE_TEXT
from .utils import background_task
SCHEMA_UPDATE = vol.Schema(
{
vol.Optional(ATTR_BACKUP): bool,
vol.Optional(ATTR_BACKGROUND, default=False): bool,
}
)
@@ -69,6 +71,12 @@ SCHEMA_ADD_REPOSITORY = vol.Schema(
{vol.Required(ATTR_REPOSITORY): vol.All(str, validate_repository)}
)
SCHEMA_INSTALL = vol.Schema(
{
vol.Optional(ATTR_BACKGROUND, default=False): bool,
}
)
def _read_static_text_file(path: Path) -> Any:
"""Read in a static text file asset for API output.
@@ -217,24 +225,46 @@ class APIStore(CoreSysAttributes):
}
@api_process
def addons_addon_install(self, request: web.Request) -> Awaitable[None]:
async def addons_addon_install(self, request: web.Request) -> dict[str, str] | None:
"""Install add-on."""
addon = self._extract_addon(request)
return asyncio.shield(self.sys_addons.install(addon.slug))
body = await api_validate(SCHEMA_INSTALL, request)
background = body[ATTR_BACKGROUND]
install_task, job_id = await background_task(
self, self.sys_addons.install, addon.slug, job_names={"addon_install"}
)
if background and not install_task.done():
return {ATTR_JOB_ID: job_id}
return await install_task
@api_process
async def addons_addon_update(self, request: web.Request) -> None:
async def addons_addon_update(self, request: web.Request) -> dict[str, str] | None:
"""Update add-on."""
addon = self._extract_addon(request, installed=True)
if addon == request.get(REQUEST_FROM):
raise APIForbidden(f"Add-on {addon.slug} can't update itself!")
body = await api_validate(SCHEMA_UPDATE, request)
background = body[ATTR_BACKGROUND]
if start_task := await asyncio.shield(
self.sys_addons.update(addon.slug, backup=body.get(ATTR_BACKUP))
):
update_task, job_id = await background_task(
self,
self.sys_addons.update,
addon.slug,
job_names={"backup_manager_partial_backup", "addon_update"},
backup=body.get(ATTR_BACKUP),
)
if background and not update_task.done():
return {ATTR_JOB_ID: job_id}
if start_task := await update_task:
await start_task
return None
@api_process
async def addons_addon_info(self, request: web.Request) -> dict[str, Any]:
@@ -297,12 +327,6 @@ class APIStore(CoreSysAttributes):
_read_static_text_file, addon.path_documentation
)
@api_process
async def addons_addon_availability(self, request: web.Request) -> None:
"""Check add-on availability for current system."""
addon = cast(AddonStore, self._extract_addon(request))
addon.validate_availability()
@api_process
async def repositories_list(self, request: web.Request) -> list[dict[str, Any]]:
"""Return all repositories."""

View File

@@ -1,7 +1,9 @@
"""Init file for Supervisor util for RESTful API."""
import asyncio
from collections.abc import Callable
import json
from typing import Any
from typing import Any, cast
from aiohttp import web
from aiohttp.hdrs import AUTHORIZATION
@@ -20,9 +22,11 @@ from ..const import (
REQUEST_FROM,
RESULT_ERROR,
RESULT_OK,
BusEvent,
)
from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import APIError, BackupFileNotFoundError, DockerAPIError, HassioError
from ..jobs import JobSchedulerOptions, SupervisorJob
from ..utils import check_exception_chain, get_message_from_exception_chain
from ..utils.json import json_dumps, json_loads as json_loads_util
from ..utils.log_format import format_message
@@ -198,3 +202,66 @@ async def api_validate(
data_validated[origin_value] = data[origin_value]
return data_validated
async def background_task(
coresys_obj: CoreSysAttributes,
task_method: Callable,
*args,
bus_event: BusEvent = BusEvent.SUPERVISOR_JOB_START,
event_filter: Callable[[Any], bool] | None = None,
job_names: set[str] | None = None,
**kwargs,
) -> tuple[asyncio.Task, str]:
"""Start task in background and return task and job ID.
Args:
coresys_obj: Instance that accesses coresys data using CoreSysAttributes
task_method: The method to execute in the background
bus_event: Event type to listen for which any initial validation has passed
event_filter: Function to determine if the event is the one specific to the job by the data
job_names: Alternative bus_event and event_filter. Validation considered passed when a named child job of primary one starts
*args: Arguments to pass to task_method
**kwargs: Keyword arguments to pass to task_method
Returns:
Tuple of (task, job_id)
"""
event = asyncio.Event()
job, task = cast(
tuple[SupervisorJob, asyncio.Task],
coresys_obj.sys_jobs.schedule_job(
task_method, JobSchedulerOptions(), *args, **kwargs
),
)
if job_names:
def child_job_filter(job_event: SupervisorJob) -> bool:
"""Return true if job is a child of main job and name is in set."""
return job_event.parent_id == job.uuid and job_event.name in job_names
event_filter = child_job_filter
async def release_on_job_start(event_data: Any):
"""Release if filter passes or no filter is provided."""
if not event_filter or event_filter(event_data):
event.set()
# Wait for provided event before returning
# If the task fails validation it should raise before getting there
listener = coresys_obj.sys_bus.register_event(bus_event, release_on_job_start)
try:
event_task = coresys_obj.sys_create_task(event.wait())
_, pending = await asyncio.wait(
(task, event_task),
return_when=asyncio.FIRST_COMPLETED,
)
# It seems task returned early (error or something), make sure to cancel
# the event task to avoid "Task was destroyed but it is pending!" errors.
if event_task in pending:
event_task.cancel()
return (task, job.uuid)
finally:
coresys_obj.sys_bus.remove_listener(listener)

View File

@@ -1,16 +1,20 @@
"""Test homeassistant api."""
import asyncio
from pathlib import Path
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, PropertyMock, patch
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
import pytest
from supervisor.backups.manager import BackupManager
from supervisor.coresys import CoreSys
from supervisor.docker.interface import DockerInterface
from supervisor.homeassistant.api import APIState
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
from supervisor.jobs.decorator import _JOB_NAMES, Job
from tests.api import common_test_api_advanced_logs
from tests.common import load_json_fixture
@@ -188,3 +192,80 @@ async def test_force_stop_during_migration(api_client: TestClient, coresys: Core
with patch.object(HomeAssistantCore, "stop") as stop:
await api_client.post("/homeassistant/stop", json={"force": True})
stop.assert_called_once()
@pytest.mark.parametrize(
("make_backup", "backup_called", "update_called"),
[(True, True, False), (False, False, True)],
)
async def test_home_assistant_background_update(
api_client: TestClient,
coresys: CoreSys,
make_backup: bool,
backup_called: bool,
update_called: bool,
):
"""Test background update of Home Assistant."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
_JOB_NAMES.remove("docker_interface_update")
_JOB_NAMES.remove("backup_manager_partial_backup")
event = asyncio.Event()
mock_update_called = mock_backup_called = False
@Job(name="docker_interface_update")
async def mock_docker_interface_update(*args, **kwargs):
nonlocal mock_update_called
mock_update_called = True
await event.wait()
@Job(name="backup_manager_partial_backup")
async def mock_partial_backup(*args, **kwargs):
nonlocal mock_backup_called
mock_backup_called = True
await event.wait()
with (
patch.object(DockerInterface, "update", new=mock_docker_interface_update),
patch.object(BackupManager, "do_backup_partial", new=mock_partial_backup),
patch.object(
DockerInterface,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.8.0")),
),
):
resp = await api_client.post(
"/core/update",
json={"background": True, "backup": make_backup, "version": "2025.8.3"},
)
assert mock_backup_called is backup_called
assert mock_update_called is update_called
assert resp.status == 200
body = await resp.json()
assert (job := coresys.jobs.get_job(body["data"]["job_id"]))
assert job.name == "home_assistant_core_update"
event.set()
async def test_background_home_assistant_update_fails_fast(
api_client: TestClient, coresys: CoreSys
):
"""Test background Home Assistant update returns error not job if validation doesn't succeed."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with (
patch.object(
DockerInterface,
"version",
new=PropertyMock(return_value=AwesomeVersion("2025.8.3")),
),
):
resp = await api_client.post(
"/core/update",
json={"background": True, "version": "2025.8.3"},
)
assert resp.status == 400
body = await resp.json()
assert body["message"] == "Version 2025.8.3 is already installed"

View File

@@ -6,11 +6,11 @@ from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from aiohttp import ClientResponse
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
import pytest
from supervisor.addons.addon import Addon
from supervisor.arch import CpuArch
from supervisor.backups.manager import BackupManager
from supervisor.config import CoreConfig
from supervisor.const import AddonState
from supervisor.coresys import CoreSys
@@ -18,7 +18,7 @@ from supervisor.docker.addon import DockerAddon
from supervisor.docker.const import ContainerState
from supervisor.docker.interface import DockerInterface
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.homeassistant.module import HomeAssistant
from supervisor.jobs.decorator import _JOB_NAMES, Job
from supervisor.store.addon import AddonStore
from supervisor.store.repository import Repository
@@ -307,7 +307,6 @@ async def get_message(resp: ClientResponse, json_expected: bool) -> str:
("post", "/store/addons/bad/install/1", True),
("post", "/store/addons/bad/update", True),
("post", "/store/addons/bad/update/1", True),
("get", "/store/addons/bad/availability", True),
# Legacy paths
("get", "/addons/bad/icon", False),
("get", "/addons/bad/logo", False),
@@ -395,129 +394,106 @@ async def test_api_store_addons_changelog_corrupted(
assert result == "Text with an invalid UTF-8 char: <20>"
async def test_api_store_addons_addon_availability_success(
api_client: TestClient, store_addon: AddonStore
):
"""Test /store/addons/{addon}/availability REST API - success case."""
resp = await api_client.get(f"/store/addons/{store_addon.slug}/availability")
@pytest.mark.usefixtures("test_repository", "tmp_supervisor_data")
async def test_addon_install_in_background(api_client: TestClient, coresys: CoreSys):
"""Test installing an addon in the background."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
_JOB_NAMES.remove("addon_install")
event = asyncio.Event()
@Job(name="addon_install")
async def mock_addon_install(*args, **kwargs):
await event.wait()
with patch.object(Addon, "install", new=mock_addon_install):
resp = await api_client.post(
"/store/addons/local_ssh/install", json={"background": True}
)
assert resp.status == 200
body = await resp.json()
assert (job := coresys.jobs.get_job(body["data"]["job_id"]))
assert job.name == "addon_manager_install"
event.set()
async def test_api_store_addons_addon_availability_arch_not_supported(
@pytest.mark.usefixtures("install_addon_ssh")
async def test_background_addon_install_fails_fast(
api_client: TestClient, coresys: CoreSys
):
"""Test /store/addons/{addon}/availability REST API - architecture not supported."""
# Create an addon with unsupported architecture
addon_obj = AddonStore(coresys, "test_arch_addon")
coresys.addons.store[addon_obj.slug] = addon_obj
"""Test background addon install returns error not job if validation fails."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Set addon config with unsupported architecture
addon_config = {
"advanced": False,
"arch": ["i386"], # Not supported on current system
"slug": "test_arch",
"description": "Test arch add-on",
"name": "Test Arch Add-on",
"repository": "test",
"stage": "stable",
"version": "1.0.0",
}
coresys.store.data.addons[addon_obj.slug] = addon_config
# Mock the system architecture to be different
with patch.object(CpuArch, "supported", new=PropertyMock(return_value=["amd64"])):
resp = await api_client.get(f"/store/addons/{addon_obj.slug}/availability")
assert resp.status == 400
result = await resp.json()
assert "not supported on this platform" in result["message"]
resp = await api_client.post(
"/store/addons/local_ssh/install", json={"background": True}
)
assert resp.status == 400
body = await resp.json()
assert body["message"] == "Add-on local_ssh is already installed"
@pytest.mark.parametrize("supported_machines", [["odroid-n2"], ["!qemux86-64"]])
async def test_api_store_addons_addon_availability_machine_not_supported(
api_client: TestClient, coresys: CoreSys, supported_machines: list[str]
@pytest.mark.parametrize(
("make_backup", "backup_called", "update_called"),
[(True, True, False), (False, False, True)],
)
@pytest.mark.usefixtures("test_repository", "tmp_supervisor_data")
async def test_addon_update_in_background(
api_client: TestClient,
coresys: CoreSys,
install_addon_ssh: Addon,
make_backup: bool,
backup_called: bool,
update_called: bool,
):
"""Test /store/addons/{addon}/availability REST API - machine not supported."""
# Create an addon with unsupported machine type
addon_obj = AddonStore(coresys, "test_machine_addon")
coresys.addons.store[addon_obj.slug] = addon_obj
"""Test updating an addon in the background."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_addon_ssh.data_store["version"] = "10.0.0"
_JOB_NAMES.remove("addon_update")
_JOB_NAMES.remove("backup_manager_partial_backup")
event = asyncio.Event()
mock_update_called = mock_backup_called = False
# Set addon config with unsupported machine
addon_config = {
"advanced": False,
"arch": ["amd64"],
"machine": supported_machines,
"slug": "test_machine",
"description": "Test machine add-on",
"name": "Test Machine Add-on",
"repository": "test",
"stage": "stable",
"version": "1.0.0",
}
coresys.store.data.addons[addon_obj.slug] = addon_config
@Job(name="addon_update")
async def mock_addon_update(*args, **kwargs):
nonlocal mock_update_called
mock_update_called = True
await event.wait()
# Mock the system machine to be different
with patch.object(CoreSys, "machine", new=PropertyMock(return_value="qemux86-64")):
resp = await api_client.get(f"/store/addons/{addon_obj.slug}/availability")
assert resp.status == 400
result = await resp.json()
assert "not supported on this machine" in result["message"]
@Job(name="backup_manager_partial_backup")
async def mock_partial_backup(*args, **kwargs):
nonlocal mock_backup_called
mock_backup_called = True
await event.wait()
async def test_api_store_addons_addon_availability_homeassistant_version_too_old(
api_client: TestClient, coresys: CoreSys, test_repository: Repository
):
"""Test /store/addons/{addon}/availability REST API - Home Assistant version too old."""
# Create an addon that requires newer Home Assistant version
addon_obj = AddonStore(coresys, "test_version_addon")
coresys.addons.store[addon_obj.slug] = addon_obj
# Set addon config with minimum Home Assistant version requirement
addon_config = {
"advanced": False,
"arch": ["amd64"],
"homeassistant": "2023.1.1", # Requires newer version than current
"slug": "test_version",
"description": "Test version add-on",
"name": "Test Version Add-on",
"repository": "test",
"stage": "stable",
"version": "1.0.0",
}
coresys.store.data.addons[addon_obj.slug] = addon_config
# Mock the Home Assistant version to be older
with patch.object(
HomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2022.1.1")),
with (
patch.object(Addon, "update", new=mock_addon_update),
patch.object(BackupManager, "do_backup_partial", new=mock_partial_backup),
):
resp = await api_client.get(f"/store/addons/{addon_obj.slug}/availability")
assert resp.status == 400
result = await resp.json()
assert (
"requires Home Assistant version 2023.1.1 or greater" in result["message"]
resp = await api_client.post(
"/store/addons/local_ssh/update",
json={"background": True, "backup": make_backup},
)
assert mock_backup_called is backup_called
assert mock_update_called is update_called
async def test_api_store_addons_addon_availability_installed_addon(
api_client: TestClient, install_addon_ssh: Addon
):
"""Test /store/addons/{addon}/availability REST API - installed addon checks against latest version."""
resp = await api_client.get("/store/addons/local_ssh/availability")
assert resp.status == 200
body = await resp.json()
assert (job := coresys.jobs.get_job(body["data"]["job_id"]))
assert job.name == "addon_manager_update"
event.set()
install_addon_ssh.data_store["version"] = AwesomeVersion("10.0.0")
install_addon_ssh.data_store["homeassistant"] = AwesomeVersion("2023.1.1")
# Mock the Home Assistant version to be older
with patch.object(
HomeAssistant,
"version",
new=PropertyMock(return_value=AwesomeVersion("2022.1.1")),
):
resp = await api_client.get("/store/addons/local_ssh/availability")
assert resp.status == 400
result = await resp.json()
assert (
"requires Home Assistant version 2023.1.1 or greater" in result["message"]
)
@pytest.mark.usefixtures("install_addon_ssh")
async def test_background_addon_update_fails_fast(
api_client: TestClient, coresys: CoreSys
):
"""Test background addon update returns error not job if validation doesn't succeed."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
"/store/addons/local_ssh/update", json={"background": True}
)
assert resp.status == 400
body = await resp.json()
assert body["message"] == "No update available for add-on local_ssh"