Move write of core state to executor (#5720)

This commit is contained in:
Mike Degatano 2025-03-04 11:49:53 -05:00 committed by GitHub
parent 76e916a07e
commit 324b059970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
57 changed files with 288 additions and 274 deletions

View File

@ -496,7 +496,7 @@ class BackupManager(FileConfiguration, JobGroup):
addon_start_tasks: list[Awaitable[None]] | None = None addon_start_tasks: list[Awaitable[None]] | None = None
try: try:
self.sys_core.state = CoreState.FREEZE await self.sys_core.set_state(CoreState.FREEZE)
async with backup.create(): async with backup.create():
# HomeAssistant Folder is for v1 # HomeAssistant Folder is for v1
@ -549,7 +549,7 @@ class BackupManager(FileConfiguration, JobGroup):
return backup return backup
finally: finally:
self.sys_core.state = CoreState.RUNNING await self.sys_core.set_state(CoreState.RUNNING)
@Job( @Job(
name="backup_manager_full_backup", name="backup_manager_full_backup",
@ -808,7 +808,7 @@ class BackupManager(FileConfiguration, JobGroup):
) )
_LOGGER.info("Full-Restore %s start", backup.slug) _LOGGER.info("Full-Restore %s start", backup.slug)
self.sys_core.state = CoreState.FREEZE await self.sys_core.set_state(CoreState.FREEZE)
try: try:
# Stop Home-Assistant / Add-ons # Stop Home-Assistant / Add-ons
@ -823,7 +823,7 @@ class BackupManager(FileConfiguration, JobGroup):
location=location, location=location,
) )
finally: finally:
self.sys_core.state = CoreState.RUNNING await self.sys_core.set_state(CoreState.RUNNING)
if success: if success:
_LOGGER.info("Full-Restore %s done", backup.slug) _LOGGER.info("Full-Restore %s done", backup.slug)
@ -878,7 +878,7 @@ class BackupManager(FileConfiguration, JobGroup):
) )
_LOGGER.info("Partial-Restore %s start", backup.slug) _LOGGER.info("Partial-Restore %s start", backup.slug)
self.sys_core.state = CoreState.FREEZE await self.sys_core.set_state(CoreState.FREEZE)
try: try:
success = await self._do_restore( success = await self._do_restore(
@ -890,7 +890,7 @@ class BackupManager(FileConfiguration, JobGroup):
location=location, location=location,
) )
finally: finally:
self.sys_core.state = CoreState.RUNNING await self.sys_core.set_state(CoreState.RUNNING)
if success: if success:
_LOGGER.info("Partial-Restore %s done", backup.slug) _LOGGER.info("Partial-Restore %s done", backup.slug)
@ -904,7 +904,7 @@ class BackupManager(FileConfiguration, JobGroup):
) )
async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None: async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None:
"""Freeze system to prepare for an external backup such as an image snapshot.""" """Freeze system to prepare for an external backup such as an image snapshot."""
self.sys_core.state = CoreState.FREEZE await self.sys_core.set_state(CoreState.FREEZE)
# Determine running addons # Determine running addons
installed = self.sys_addons.installed.copy() installed = self.sys_addons.installed.copy()
@ -957,7 +957,7 @@ class BackupManager(FileConfiguration, JobGroup):
if task if task
] ]
finally: finally:
self.sys_core.state = CoreState.RUNNING await self.sys_core.set_state(CoreState.RUNNING)
self._thaw_event.clear() self._thaw_event.clear()
self._thaw_task = None self._thaw_task = None

View File

@ -5,6 +5,7 @@ from collections.abc import Awaitable
from contextlib import suppress from contextlib import suppress
from datetime import timedelta from datetime import timedelta
import logging import logging
from typing import Self
from .const import ( from .const import (
ATTR_STARTUP, ATTR_STARTUP,
@ -39,7 +40,6 @@ class Core(CoreSysAttributes):
"""Initialize Supervisor object.""" """Initialize Supervisor object."""
self.coresys: CoreSys = coresys self.coresys: CoreSys = coresys
self._state: CoreState = CoreState.INITIALIZE self._state: CoreState = CoreState.INITIALIZE
self._write_run_state(self._state)
self.exit_code: int = 0 self.exit_code: int = 0
@property @property
@ -57,32 +57,38 @@ class Core(CoreSysAttributes):
"""Return true if the installation is healthy.""" """Return true if the installation is healthy."""
return len(self.sys_resolution.unhealthy) == 0 return len(self.sys_resolution.unhealthy) == 0
def _write_run_state(self, new_state: CoreState): async def _write_run_state(self):
"""Write run state for s6 service supervisor.""" """Write run state for s6 service supervisor."""
try: try:
RUN_SUPERVISOR_STATE.write_text(str(new_state), encoding="utf-8") await self.sys_run_in_executor(
RUN_SUPERVISOR_STATE.write_text, str(self._state), encoding="utf-8"
)
except OSError as err: except OSError as err:
_LOGGER.warning( _LOGGER.warning(
"Can't update the Supervisor state to %s: %s", new_state, err "Can't update the Supervisor state to %s: %s", self._state, err
) )
@state.setter async def post_init(self) -> Self:
def state(self, new_state: CoreState) -> None: """Post init actions that must be done in event loop."""
await self._write_run_state()
return self
async def set_state(self, new_state: CoreState) -> None:
"""Set core into new state.""" """Set core into new state."""
if self._state == new_state: if self._state == new_state:
return return
self._write_run_state(new_state)
self._state = new_state self._state = new_state
await self._write_run_state()
# Don't attempt to notify anyone on CLOSE as we're about to stop the event loop # Don't attempt to notify anyone on CLOSE as we're about to stop the event loop
if new_state != CoreState.CLOSE: if self._state != CoreState.CLOSE:
self.sys_bus.fire_event(BusEvent.SUPERVISOR_STATE_CHANGE, new_state) self.sys_bus.fire_event(BusEvent.SUPERVISOR_STATE_CHANGE, self._state)
# These will be received by HA after startup has completed which won't make sense # These will be received by HA after startup has completed which won't make sense
if new_state not in STARTING_STATES: if self._state not in STARTING_STATES:
self.sys_homeassistant.websocket.supervisor_update_event( self.sys_homeassistant.websocket.supervisor_update_event(
"info", {"state": new_state} "info", {"state": self._state}
) )
async def connect(self): async def connect(self):
@ -116,7 +122,7 @@ class Core(CoreSysAttributes):
async def setup(self): async def setup(self):
"""Start setting up supervisor orchestration.""" """Start setting up supervisor orchestration."""
self.state = CoreState.SETUP await self.set_state(CoreState.SETUP)
# Check internet on startup # Check internet on startup
await self.sys_supervisor.check_connectivity() await self.sys_supervisor.check_connectivity()
@ -196,7 +202,7 @@ class Core(CoreSysAttributes):
async def start(self): async def start(self):
"""Start Supervisor orchestration.""" """Start Supervisor orchestration."""
self.state = CoreState.STARTUP await self.set_state(CoreState.STARTUP)
# Check if system is healthy # Check if system is healthy
if not self.supported: if not self.supported:
@ -282,7 +288,7 @@ class Core(CoreSysAttributes):
self.sys_create_task(self.sys_updater.reload()) self.sys_create_task(self.sys_updater.reload())
self.sys_create_task(self.sys_resolution.healthcheck()) self.sys_create_task(self.sys_resolution.healthcheck())
self.state = CoreState.RUNNING await self.set_state(CoreState.RUNNING)
self.sys_homeassistant.websocket.supervisor_update_event( self.sys_homeassistant.websocket.supervisor_update_event(
"supervisor", {ATTR_STARTUP: "complete"} "supervisor", {ATTR_STARTUP: "complete"}
) )
@ -297,7 +303,7 @@ class Core(CoreSysAttributes):
return return
# don't process scheduler anymore # don't process scheduler anymore
self.state = CoreState.STOPPING await self.set_state(CoreState.STOPPING)
# Stage 1 # Stage 1
try: try:
@ -332,7 +338,7 @@ class Core(CoreSysAttributes):
except TimeoutError: except TimeoutError:
_LOGGER.warning("Stage 2: Force Shutdown!") _LOGGER.warning("Stage 2: Force Shutdown!")
self.state = CoreState.CLOSE await self.set_state(CoreState.CLOSE)
_LOGGER.info("Supervisor is down - %d", self.exit_code) _LOGGER.info("Supervisor is down - %d", self.exit_code)
self.sys_loop.stop() self.sys_loop.stop()
@ -340,7 +346,7 @@ class Core(CoreSysAttributes):
"""Shutdown all running containers in correct order.""" """Shutdown all running containers in correct order."""
# don't process scheduler anymore # don't process scheduler anymore
if self.state == CoreState.RUNNING: if self.state == CoreState.RUNNING:
self.state = CoreState.SHUTDOWN await self.set_state(CoreState.SHUTDOWN)
# Shutdown Application Add-ons, using Home Assistant API # Shutdown Application Add-ons, using Home Assistant API
await self.sys_addons.shutdown(AddonStartup.APPLICATION) await self.sys_addons.shutdown(AddonStartup.APPLICATION)

View File

@ -58,7 +58,7 @@ async def api_token_validation(aiohttp_client, coresys: CoreSys) -> TestClient:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys): async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys):
"""Test security.""" """Test security."""
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
resp = await api_system.get("/supervisor/ping") resp = await api_system.get("/supervisor/ping")
result = await resp.json() result = await resp.json()
@ -69,7 +69,7 @@ async def test_api_security_system_initialize(api_system: TestClient, coresys: C
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSys): async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSys):
"""Test security.""" """Test security."""
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
resp = await api_system.get("/supervisor/ping") resp = await api_system.get("/supervisor/ping")
result = await resp.json() result = await resp.json()
@ -80,7 +80,7 @@ async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSy
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_security_system_running(api_system: TestClient, coresys: CoreSys): async def test_api_security_system_running(api_system: TestClient, coresys: CoreSys):
"""Test security.""" """Test security."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
resp = await api_system.get("/supervisor/ping") resp = await api_system.get("/supervisor/ping")
assert resp.status == 200 assert resp.status == 200
@ -89,7 +89,7 @@ async def test_api_security_system_running(api_system: TestClient, coresys: Core
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_security_system_startup(api_system: TestClient, coresys: CoreSys): async def test_api_security_system_startup(api_system: TestClient, coresys: CoreSys):
"""Test security.""" """Test security."""
coresys.core.state = CoreState.STARTUP await coresys.core.set_state(CoreState.STARTUP)
resp = await api_system.get("/supervisor/ping") resp = await api_system.get("/supervisor/ping")
assert resp.status == 200 assert resp.status == 200

View File

@ -137,7 +137,7 @@ async def test_backup_to_location(
await coresys.mounts.create_mount(mount) await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount coresys.mounts.default_backup_mount = mount
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post( resp = await api_client.post(
"/backups/new/full", "/backups/new/full",
@ -178,7 +178,7 @@ async def test_backup_to_default(api_client: TestClient, coresys: CoreSys):
await coresys.mounts.create_mount(mount) await coresys.mounts.create_mount(mount)
coresys.mounts.default_backup_mount = mount coresys.mounts.default_backup_mount = mount
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post( resp = await api_client.post(
"/backups/new/full", "/backups/new/full",
@ -196,7 +196,7 @@ async def test_api_freeze_thaw(
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
): ):
"""Test manual freeze and thaw for external backup via API.""" """Test manual freeze and thaw for external backup via API."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0") ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
@ -230,7 +230,7 @@ async def test_api_backup_exclude_database(
exclude_db_setting: bool, exclude_db_setting: bool,
): ):
"""Test backups exclude the database when specified.""" """Test backups exclude the database when specified."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0") coresys.homeassistant.version = AwesomeVersion("2023.09.0")
coresys.homeassistant.backups_exclude_database = exclude_db_setting coresys.homeassistant.backups_exclude_database = exclude_db_setting
@ -278,7 +278,7 @@ async def test_api_backup_restore_background(
tmp_supervisor_data: Path, tmp_supervisor_data: Path,
): ):
"""Test background option on backup/restore APIs.""" """Test background option on backup/restore APIs."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0") coresys.homeassistant.version = AwesomeVersion("2023.09.0")
(tmp_supervisor_data / "addons/local").mkdir(parents=True) (tmp_supervisor_data / "addons/local").mkdir(parents=True)
@ -364,7 +364,7 @@ async def test_api_backup_errors(
tmp_supervisor_data: Path, tmp_supervisor_data: Path,
): ):
"""Test error reporting in backup job.""" """Test error reporting in backup job."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2023.09.0") coresys.homeassistant.version = AwesomeVersion("2023.09.0")
(tmp_supervisor_data / "addons/local").mkdir(parents=True) (tmp_supervisor_data / "addons/local").mkdir(parents=True)
@ -435,7 +435,7 @@ async def test_api_backup_errors(
async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys): async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys):
"""Test backup errors that return immediately even in background mode.""" """Test backup errors that return immediately even in background mode."""
coresys.core.state = CoreState.FREEZE await coresys.core.set_state(CoreState.FREEZE)
resp = await api_client.post( resp = await api_client.post(
"/backups/new/full", "/backups/new/full",
json={"name": "Test", "background": True}, json={"name": "Test", "background": True},
@ -443,7 +443,7 @@ async def test_backup_immediate_errors(api_client: TestClient, coresys: CoreSys)
assert resp.status == 400 assert resp.status == 400
assert "freeze" in (await resp.json())["message"] assert "freeze" in (await resp.json())["message"]
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 0.5 coresys.hardware.disk.get_disk_free_space = lambda x: 0.5
resp = await api_client.post( resp = await api_client.post(
"/backups/new/partial", "/backups/new/partial",
@ -460,7 +460,7 @@ async def test_restore_immediate_errors(
mock_partial_backup: Backup, mock_partial_backup: Backup,
): ):
"""Test restore errors that return immediately even in background mode.""" """Test restore errors that return immediately even in background mode."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post( resp = await api_client.post(
@ -634,7 +634,7 @@ async def test_backup_to_multiple_locations(
inputs: dict[str, Any], inputs: dict[str, Any],
): ):
"""Test making a backup to multiple locations.""" """Test making a backup to multiple locations."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post( resp = await api_client.post(
@ -669,7 +669,7 @@ async def test_backup_with_extras(
inputs: dict[str, Any], inputs: dict[str, Any],
): ):
"""Test backup including extra metdata.""" """Test backup including extra metdata."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post( resp = await api_client.post(
@ -909,7 +909,7 @@ async def test_partial_backup_all_addons(
install_addon_ssh: Addon, install_addon_ssh: Addon,
): ):
"""Test backup including extra metdata.""" """Test backup including extra metdata."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object(Backup, "store_addons") as store_addons: with patch.object(Backup, "store_addons") as store_addons:
@ -928,7 +928,7 @@ async def test_restore_backup_from_location(
local_location: str | None, local_location: str | None,
): ):
"""Test restoring a backup from a specific location.""" """Test restoring a backup from a specific location."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Make a backup and a file to test with # Make a backup and a file to test with
@ -1000,7 +1000,7 @@ async def test_restore_backup_unencrypted_after_encrypted(
# We punt the ball on this one for this PR since this is a rare edge case. # We punt the ball on this one for this PR since this is a rare edge case.
backup.restore_dockerconfig = MagicMock() backup.restore_dockerconfig = MagicMock()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Restore encrypted backup # Restore encrypted backup
@ -1050,7 +1050,7 @@ async def test_restore_homeassistant_adds_env(
): ):
"""Test restoring home assistant from backup adds env to container.""" """Test restoring home assistant from backup adds env to container."""
event = asyncio.Event() event = asyncio.Event()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.version = AwesomeVersion("2025.1.0") coresys.homeassistant.version = AwesomeVersion("2025.1.0")
backup = await coresys.backups.do_backup_full() backup = await coresys.backups.do_backup_full()
@ -1134,7 +1134,7 @@ async def test_protected_backup(
api_client: TestClient, coresys: CoreSys, backup_type: str, options: dict[str, Any] api_client: TestClient, coresys: CoreSys, backup_type: str, options: dict[str, Any]
): ):
"""Test creating a protected backup.""" """Test creating a protected backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post( resp = await api_client.post(
@ -1246,7 +1246,7 @@ async def test_missing_file_removes_location_from_cache(
backup_file: str, backup_file: str,
): ):
"""Test finding a missing file removes the location from cache.""" """Test finding a missing file removes the location from cache."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path(backup_file) backup_file = get_fixture_path(backup_file)
@ -1305,7 +1305,7 @@ async def test_missing_file_removes_backup_from_cache(
backup_file: str, backup_file: str,
): ):
"""Test finding a missing file removes the backup from cache if its the only one.""" """Test finding a missing file removes the backup from cache if its the only one."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path(backup_file) backup_file = get_fixture_path(backup_file)
@ -1331,7 +1331,7 @@ async def test_immediate_list_after_missing_file_restore(
api_client: TestClient, coresys: CoreSys api_client: TestClient, coresys: CoreSys
): ):
"""Test race with reload for missing file on restore does not error.""" """Test race with reload for missing file on restore does not error."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path("backup_example.tar") backup_file = get_fixture_path("backup_example.tar")

View File

@ -128,7 +128,7 @@ async def test_api_resolution_check_options(coresys: CoreSys, api_client: TestCl
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_api_resolution_check_run(coresys: CoreSys, api_client: TestClient): async def test_api_resolution_check_run(coresys: CoreSys, api_client: TestClient):
"""Test client API with run check.""" """Test client API with run check."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
free_space = coresys.resolution.check.get("free_space") free_space = coresys.resolution.check.get("free_space")
free_space.run_check = AsyncMock() free_space.run_check = AsyncMock()

View File

@ -48,7 +48,7 @@ from tests.dbus_service_mocks.systemd_unit import SystemdUnit as SystemdUnitServ
async def test_do_backup_full(coresys: CoreSys, backup_mock, install_addon_ssh): async def test_do_backup_full(coresys: CoreSys, backup_mock, install_addon_ssh):
"""Test creating Backup.""" """Test creating Backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -81,7 +81,7 @@ async def test_do_backup_full_with_filename(
coresys: CoreSys, filename: str, filename_expected: str, backup_mock coresys: CoreSys, filename: str, filename_expected: str, backup_mock
): ):
"""Test creating Backup with a specific file name.""" """Test creating Backup with a specific file name."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -99,7 +99,7 @@ async def test_do_backup_full_uncompressed(
coresys: CoreSys, backup_mock, install_addon_ssh coresys: CoreSys, backup_mock, install_addon_ssh
): ):
"""Test creating Backup.""" """Test creating Backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -129,7 +129,7 @@ async def test_do_backup_partial_minimal(
coresys: CoreSys, backup_mock, install_addon_ssh coresys: CoreSys, backup_mock, install_addon_ssh
): ):
"""Test creating minimal partial Backup.""" """Test creating minimal partial Backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -156,7 +156,7 @@ async def test_do_backup_partial_minimal_uncompressed(
coresys: CoreSys, backup_mock, install_addon_ssh coresys: CoreSys, backup_mock, install_addon_ssh
): ):
"""Test creating minimal partial Backup.""" """Test creating minimal partial Backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -185,7 +185,7 @@ async def test_do_backup_partial_maximal(
coresys: CoreSys, backup_mock, install_addon_ssh coresys: CoreSys, backup_mock, install_addon_ssh
): ):
"""Test creating maximal partial Backup.""" """Test creating maximal partial Backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -217,7 +217,7 @@ async def test_do_backup_partial_maximal(
async def test_do_restore_full(coresys: CoreSys, full_backup_mock, install_addon_ssh): async def test_do_restore_full(coresys: CoreSys, full_backup_mock, install_addon_ssh):
"""Test restoring full Backup.""" """Test restoring full Backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None) coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None) coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@ -248,7 +248,7 @@ async def test_do_restore_full_different_addon(
coresys: CoreSys, full_backup_mock, install_addon_ssh coresys: CoreSys, full_backup_mock, install_addon_ssh
): ):
"""Test restoring full Backup with different addons than installed.""" """Test restoring full Backup with different addons than installed."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None) coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None) coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@ -280,7 +280,7 @@ async def test_do_restore_partial_minimal(
coresys: CoreSys, partial_backup_mock, install_addon_ssh coresys: CoreSys, partial_backup_mock, install_addon_ssh
): ):
"""Test restoring partial Backup minimal.""" """Test restoring partial Backup minimal."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None) coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None) coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@ -303,7 +303,7 @@ async def test_do_restore_partial_minimal(
async def test_do_restore_partial_maximal(coresys: CoreSys, partial_backup_mock): async def test_do_restore_partial_maximal(coresys: CoreSys, partial_backup_mock):
"""Test restoring partial Backup minimal.""" """Test restoring partial Backup minimal."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None) coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None) coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@ -334,7 +334,7 @@ async def test_fail_invalid_full_backup(
coresys: CoreSys, full_backup_mock: MagicMock, partial_backup_mock: MagicMock coresys: CoreSys, full_backup_mock: MagicMock, partial_backup_mock: MagicMock
): ):
"""Test restore fails with invalid backup.""" """Test restore fails with invalid backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -366,7 +366,7 @@ async def test_fail_invalid_partial_backup(
coresys: CoreSys, partial_backup_mock: MagicMock coresys: CoreSys, partial_backup_mock: MagicMock
): ):
"""Test restore fails with invalid backup.""" """Test restore fails with invalid backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -403,7 +403,7 @@ async def test_backup_error(
capture_exception: Mock, capture_exception: Mock,
): ):
"""Test error captured when backup fails.""" """Test error captured when backup fails."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_mock.return_value.store_folders.side_effect = (err := OSError()) backup_mock.return_value.store_folders.side_effect = (err := OSError())
@ -416,7 +416,7 @@ async def test_restore_error(
coresys: CoreSys, full_backup_mock: MagicMock, capture_exception: Mock coresys: CoreSys, full_backup_mock: MagicMock, capture_exception: Mock
): ):
"""Test restoring full Backup with errors.""" """Test restoring full Backup with errors."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None) coresys.homeassistant.core.start = AsyncMock(return_value=None)
@ -475,7 +475,7 @@ async def test_backup_media_with_mounts(
assert (mount_dir := coresys.config.path_media / "media_test").is_dir() assert (mount_dir := coresys.config.path_media / "media_test").is_dir()
# Make a partial backup # Make a partial backup
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"]) backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"])
@ -532,7 +532,7 @@ async def test_backup_media_with_mounts_retains_files(
) )
# Make a partial backup # Make a partial backup
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"]) backup: Backup = await coresys.backups.do_backup_partial("test", folders=["media"])
@ -599,7 +599,7 @@ async def test_backup_share_with_mounts(
assert (mount_dir := coresys.config.path_share / "share_test").is_dir() assert (mount_dir := coresys.config.path_share / "share_test").is_dir()
# Make a partial backup # Make a partial backup
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_partial("test", folders=["share"]) backup: Backup = await coresys.backups.do_backup_partial("test", folders=["share"])
@ -646,7 +646,7 @@ async def test_full_backup_to_mount(
assert coresys.backups.backup_locations["backup_test"] == mount_dir assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Make a backup and add it to mounts. Confirm it exists in the right place # Make a backup and add it to mounts. Confirm it exists in the right place
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup: Backup = await coresys.backups.do_backup_full("test", location=mount) backup: Backup = await coresys.backups.do_backup_full("test", location=mount)
assert (mount_dir / f"{backup.slug}.tar").exists() assert (mount_dir / f"{backup.slug}.tar").exists()
@ -692,7 +692,7 @@ async def test_partial_backup_to_mount(
assert coresys.backups.backup_locations["backup_test"] == mount_dir assert coresys.backups.backup_locations["backup_test"] == mount_dir
# Make a backup and add it to mounts. Confirm it exists in the right place # Make a backup and add it to mounts. Confirm it exists in the right place
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object( with patch.object(
@ -746,7 +746,7 @@ async def test_backup_to_down_mount_error(
# Attempt to make a backup which fails because is_mount on directory is false # Attempt to make a backup which fails because is_mount on directory is false
mock_is_mount.return_value = False mock_is_mount.return_value = False
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with pytest.raises(BackupMountDownError): with pytest.raises(BackupMountDownError):
await coresys.backups.do_backup_full("test", location=mount) await coresys.backups.do_backup_full("test", location=mount)
@ -780,7 +780,7 @@ async def test_backup_to_local_with_default(
coresys.mounts.default_backup_mount = mount coresys.mounts.default_backup_mount = mount
# Make a backup for local. Confirm it exists in the right place # Make a backup for local. Confirm it exists in the right place
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object( with patch.object(
@ -820,7 +820,7 @@ async def test_backup_to_default(
coresys.mounts.default_backup_mount = mount coresys.mounts.default_backup_mount = mount
# Make a backup for default. Confirm it exists in the right place # Make a backup for default. Confirm it exists in the right place
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with patch.object( with patch.object(
@ -861,7 +861,7 @@ async def test_backup_to_default_mount_down_error(
# Attempt to make a backup which fails because is_mount on directory is false # Attempt to make a backup which fails because is_mount on directory is false
mock_is_mount.return_value = False mock_is_mount.return_value = False
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with pytest.raises(BackupMountDownError): with pytest.raises(BackupMountDownError):
@ -914,7 +914,7 @@ async def test_backup_with_healthcheck(
container.status = "running" container.status = "running"
container.attrs["Config"] = {"Healthcheck": "exists"} container.attrs["Config"] = {"Healthcheck": "exists"}
install_addon_ssh.path_data.mkdir() install_addon_ssh.path_data.mkdir()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await install_addon_ssh.load() await install_addon_ssh.load()
await asyncio.sleep(0) await asyncio.sleep(0)
@ -992,7 +992,7 @@ async def test_restore_with_healthcheck(
container.status = "running" container.status = "running"
container.attrs["Config"] = {"Healthcheck": "exists"} container.attrs["Config"] = {"Healthcheck": "exists"}
install_addon_ssh.path_data.mkdir() install_addon_ssh.path_data.mkdir()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
await install_addon_ssh.load() await install_addon_ssh.load()
await asyncio.sleep(0) await asyncio.sleep(0)
@ -1093,7 +1093,7 @@ async def test_backup_progress(
"""Test progress is tracked during backups.""" """Test progress is tracked during backups."""
container.status = "running" container.status = "running"
install_addon_ssh.path_data.mkdir() install_addon_ssh.path_data.mkdir()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
with ( with (
@ -1193,7 +1193,7 @@ async def test_restore_progress(
container.status = "running" container.status = "running"
install_addon_ssh.path_data.mkdir() install_addon_ssh.path_data.mkdir()
install_addon_ssh.state = AddonState.STARTED install_addon_ssh.state = AddonState.STARTED
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
full_backup: Backup = await coresys.backups.do_backup_full() full_backup: Backup = await coresys.backups.do_backup_full()
@ -1368,7 +1368,7 @@ async def test_freeze_thaw(
"""Test manual freeze and thaw for external snapshots.""" """Test manual freeze and thaw for external snapshots."""
container.status = "running" container.status = "running"
install_addon_ssh.path_data.mkdir() install_addon_ssh.path_data.mkdir()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
container.exec_run.return_value = (0, None) container.exec_run.return_value = (0, None)
ha_ws_client.ha_version = AwesomeVersion("2022.1.0") ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
@ -1450,7 +1450,7 @@ async def test_freeze_thaw_timeout(
path_extern, path_extern,
): ):
"""Test manual freeze ends due to timeout expiration.""" """Test manual freeze ends due to timeout expiration."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0") ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
@ -1474,7 +1474,7 @@ async def test_freeze_thaw_timeout(
async def test_cannot_manually_thaw_normal_freeze(coresys: CoreSys): async def test_cannot_manually_thaw_normal_freeze(coresys: CoreSys):
"""Test thaw_all cannot be used unless freeze was started by freeze_all method.""" """Test thaw_all cannot be used unless freeze was started by freeze_all method."""
coresys.core.state = CoreState.FREEZE await coresys.core.set_state(CoreState.FREEZE)
with pytest.raises(BackupError): with pytest.raises(BackupError):
await coresys.backups.thaw_all() await coresys.backups.thaw_all()
@ -1487,7 +1487,7 @@ async def test_restore_only_reloads_ingress_on_change(
): ):
"""Test restore only tells core to reload ingress when something has changed.""" """Test restore only tells core to reload ingress when something has changed."""
install_addon_ssh.path_data.mkdir() install_addon_ssh.path_data.mkdir()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_no_ingress: Backup = await coresys.backups.do_backup_partial( backup_no_ingress: Backup = await coresys.backups.do_backup_partial(
@ -1547,7 +1547,7 @@ async def test_restore_new_addon(
path_extern, path_extern,
): ):
"""Test restore installing new addon.""" """Test restore installing new addon."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
assert not install_addon_example.path_data.exists() assert not install_addon_example.path_data.exists()
@ -1578,7 +1578,7 @@ async def test_restore_preserves_data_config(
path_extern, path_extern,
): ):
"""Test restore preserves data and config.""" """Test restore preserves data and config."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_addon_example.path_data.mkdir() install_addon_example.path_data.mkdir()
@ -1616,7 +1616,7 @@ async def test_backup_to_mount_bypasses_free_space_condition(
mock_is_mount, mock_is_mount,
): ):
"""Test backing up to a mount bypasses the check on local free space.""" """Test backing up to a mount bypasses the check on local free space."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda _: 0.1 coresys.hardware.disk.get_disk_free_space = lambda _: 0.1
# These fail due to lack of local free space # These fail due to lack of local free space
@ -1669,7 +1669,7 @@ async def test_skip_homeassistant_database(
path_extern, path_extern,
): ):
"""Test exclude database option skips database in backup.""" """Test exclude database option skips database in backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.jobs.ignore_conditions = [ coresys.jobs.ignore_conditions = [
JobCondition.INTERNET_HOST, JobCondition.INTERNET_HOST,
@ -1812,7 +1812,7 @@ async def test_monitoring_after_full_restore(
coresys: CoreSys, full_backup_mock, install_addon_ssh, container coresys: CoreSys, full_backup_mock, install_addon_ssh, container
): ):
"""Test monitoring of addon state still works after full restore.""" """Test monitoring of addon state still works after full restore."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.homeassistant.core.start = AsyncMock(return_value=None) coresys.homeassistant.core.start = AsyncMock(return_value=None)
coresys.homeassistant.core.stop = AsyncMock(return_value=None) coresys.homeassistant.core.stop = AsyncMock(return_value=None)
@ -1833,7 +1833,7 @@ async def test_monitoring_after_partial_restore(
coresys: CoreSys, partial_backup_mock, install_addon_ssh, container coresys: CoreSys, partial_backup_mock, install_addon_ssh, container
): ):
"""Test monitoring of addon state still works after full restore.""" """Test monitoring of addon state still works after full restore."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
manager = await BackupManager(coresys).load_config() manager = await BackupManager(coresys).load_config()
@ -1866,7 +1866,7 @@ async def test_core_pre_backup_actions_failed(
path_extern, path_extern,
): ):
"""Test pre-backup actions failed in HA core stops backup.""" """Test pre-backup actions failed in HA core stops backup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2024.7.0") ha_ws_client.ha_version = AwesomeVersion("2024.7.0")
ha_ws_client.async_send_command.return_value = { ha_ws_client.async_send_command.return_value = {
@ -2028,7 +2028,7 @@ async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
@pytest.mark.usefixtures("tmp_supervisor_data") @pytest.mark.usefixtures("tmp_supervisor_data")
async def test_addon_backup_excludes(coresys: CoreSys, install_addon_example: Addon): async def test_addon_backup_excludes(coresys: CoreSys, install_addon_example: Addon):
"""Test backup excludes option for addons.""" """Test backup excludes option for addons."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
install_addon_example.path_data.mkdir(parents=True) install_addon_example.path_data.mkdir(parents=True)

View File

@ -315,6 +315,7 @@ async def coresys(
with ( with (
patch("supervisor.bootstrap.initialize_system"), patch("supervisor.bootstrap.initialize_system"),
patch("supervisor.utils.sentry.sentry_sdk.init"), patch("supervisor.utils.sentry.sentry_sdk.init"),
patch("supervisor.core.Core._write_run_state"),
): ):
coresys_obj = await initialize_coresys() coresys_obj = await initialize_coresys()
@ -389,7 +390,7 @@ async def coresys(
async def ha_ws_client(coresys: CoreSys) -> AsyncMock: async def ha_ws_client(coresys: CoreSys) -> AsyncMock:
"""Return HA WS client mock for assertions.""" """Return HA WS client mock for assertions."""
# Set Supervisor Core state to RUNNING, otherwise WS events won't be delivered # Set Supervisor Core state to RUNNING, otherwise WS events won't be delivered
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await asyncio.sleep(0) await asyncio.sleep(0)
client = coresys.homeassistant.websocket._client client = coresys.homeassistant.websocket._client
client.async_send_command.reset_mock() client.async_send_command.reset_mock()
@ -501,10 +502,14 @@ def store_manager(coresys: CoreSys):
@pytest.fixture @pytest.fixture
def run_supervisor_state() -> Generator[MagicMock]: def run_supervisor_state(request: pytest.FixtureRequest) -> Generator[MagicMock]:
"""Fixture to simulate Supervisor state file in /run/supervisor.""" """Fixture to simulate Supervisor state file in /run/supervisor."""
with patch("supervisor.core.RUN_SUPERVISOR_STATE") as mock_run: if getattr(request, "param", "test_file"):
yield mock_run with patch("supervisor.core.RUN_SUPERVISOR_STATE") as mock_run:
yield mock_run
else:
with patch("supervisor.core.Core._write_run_state") as mock_write_state:
yield mock_write_state
@pytest.fixture @pytest.fixture

View File

@ -49,11 +49,11 @@ async def test_load(
assert coresys.homeassistant.secrets.secrets == {"hello": "world"} assert coresys.homeassistant.secrets.secrets == {"hello": "world"}
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
await coresys.homeassistant.websocket.async_send_message({"lorem": "ipsum"}) await coresys.homeassistant.websocket.async_send_message({"lorem": "ipsum"})
ha_ws_client.async_send_command.assert_not_called() ha_ws_client.async_send_command.assert_not_called()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await asyncio.sleep(0) await asyncio.sleep(0)
assert ha_ws_client.async_send_command.call_args_list[0][0][0] == {"lorem": "ipsum"} assert ha_ws_client.async_send_command.call_args_list[0][0][0] == {"lorem": "ipsum"}

View File

@ -57,14 +57,14 @@ async def test_send_command_old_core_version(
async def test_send_message_during_startup(coresys: CoreSys, ha_ws_client: AsyncMock): async def test_send_message_during_startup(coresys: CoreSys, ha_ws_client: AsyncMock):
"""Test websocket messages queue during startup.""" """Test websocket messages queue during startup."""
await coresys.homeassistant.websocket.load() await coresys.homeassistant.websocket.load()
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
await coresys.homeassistant.websocket.async_supervisor_update_event( await coresys.homeassistant.websocket.async_supervisor_update_event(
"test", {"lorem": "ipsum"} "test", {"lorem": "ipsum"}
) )
ha_ws_client.async_send_command.assert_not_called() ha_ws_client.async_send_command.assert_not_called()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await asyncio.sleep(0) await asyncio.sleep(0)
assert ha_ws_client.async_send_command.call_count == 2 assert ha_ws_client.async_send_command.call_count == 2

View File

@ -234,7 +234,7 @@ async def test_host_connectivity_disabled(
"""Test host connectivity check disabled.""" """Test host connectivity check disabled."""
await coresys.host.network.load() await coresys.host.network.load()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await asyncio.sleep(0) await asyncio.sleep(0)
ha_ws_client.async_send_command.reset_mock() ha_ws_client.async_send_command.reset_mock()

View File

@ -75,7 +75,7 @@ async def test_internet(
system_result: bool | None, system_result: bool | None,
): ):
"""Test the internet decorator.""" """Test the internet decorator."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
reset_last_call(Supervisor.check_connectivity) reset_last_call(Supervisor.check_connectivity)
class TestClass: class TestClass:
@ -241,10 +241,10 @@ async def test_running(coresys: CoreSys):
test = TestClass(coresys) test = TestClass(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert await test.execute() assert await test.execute()
coresys.core.state = CoreState.FREEZE await coresys.core.set_state(CoreState.FREEZE)
assert not await test.execute() assert not await test.execute()
coresys.jobs.ignore_conditions = [JobCondition.RUNNING] coresys.jobs.ignore_conditions = [JobCondition.RUNNING]
@ -272,10 +272,10 @@ async def test_exception_conditions(coresys: CoreSys):
test = TestClass(coresys) test = TestClass(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert await test.execute() assert await test.execute()
coresys.core.state = CoreState.FREEZE await coresys.core.set_state(CoreState.FREEZE)
with pytest.raises(HassioError): with pytest.raises(HassioError):
await test.execute() await test.execute()

View File

@ -107,22 +107,22 @@ def test_is_dev(coresys):
assert filter_data(coresys, SAMPLE_EVENT, {}) is None assert filter_data(coresys, SAMPLE_EVENT, {}) is None
def test_not_started(coresys): async def test_not_started(coresys):
"""Test if supervisor not fully started.""" """Test if supervisor not fully started."""
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert filter_data(coresys, SAMPLE_EVENT, {}) == SAMPLE_EVENT assert filter_data(coresys, SAMPLE_EVENT, {}) == SAMPLE_EVENT
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert filter_data(coresys, SAMPLE_EVENT, {}) == SAMPLE_EVENT assert filter_data(coresys, SAMPLE_EVENT, {}) == SAMPLE_EVENT
def test_defaults(coresys): async def test_defaults(coresys):
"""Test event defaults.""" """Test event defaults."""
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
filtered = filter_data(coresys, SAMPLE_EVENT, {}) filtered = filter_data(coresys, SAMPLE_EVENT, {})
@ -135,12 +135,12 @@ def test_defaults(coresys):
assert filtered["user"]["id"] == coresys.machine_id assert filtered["user"]["id"] == coresys.machine_id
def test_sanitize_user_hostname(coresys): async def test_sanitize_user_hostname(coresys):
"""Test user hostname event sanitation.""" """Test user hostname event sanitation."""
event = SAMPLE_EVENT_AIOHTTP_EXTERNAL event = SAMPLE_EVENT_AIOHTTP_EXTERNAL
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
filtered = filter_data(coresys, event, {}) filtered = filter_data(coresys, event, {})
@ -154,25 +154,25 @@ def test_sanitize_user_hostname(coresys):
) )
def test_sanitize_internal(coresys): async def test_sanitize_internal(coresys):
"""Test internal event sanitation.""" """Test internal event sanitation."""
event = SAMPLE_EVENT_AIOHTTP_INTERNAL event = SAMPLE_EVENT_AIOHTTP_INTERNAL
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
filtered = filter_data(coresys, event, {}) filtered = filter_data(coresys, event, {})
assert filtered == event assert filtered == event
def test_issues_on_report(coresys): async def test_issues_on_report(coresys):
"""Attach issue to report.""" """Attach issue to report."""
coresys.resolution.create_issue(IssueType.FATAL_ERROR, ContextType.SYSTEM) coresys.resolution.create_issue(IssueType.FATAL_ERROR, ContextType.SYSTEM)
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
event = filter_data(coresys, SAMPLE_EVENT, {}) event = filter_data(coresys, SAMPLE_EVENT, {})
@ -182,7 +182,7 @@ def test_issues_on_report(coresys):
assert event["contexts"]["resolution"]["issues"][0]["context"] == ContextType.SYSTEM assert event["contexts"]["resolution"]["issues"][0]["context"] == ContextType.SYSTEM
def test_suggestions_on_report(coresys): async def test_suggestions_on_report(coresys):
"""Attach suggestion to report.""" """Attach suggestion to report."""
coresys.resolution.create_issue( coresys.resolution.create_issue(
@ -192,7 +192,7 @@ def test_suggestions_on_report(coresys):
) )
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
event = filter_data(coresys, SAMPLE_EVENT, {}) event = filter_data(coresys, SAMPLE_EVENT, {})
@ -210,11 +210,11 @@ def test_suggestions_on_report(coresys):
) )
def test_unhealthy_on_report(coresys): async def test_unhealthy_on_report(coresys):
"""Attach unhealthy to report.""" """Attach unhealthy to report."""
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.resolution.unhealthy = UnhealthyReason.DOCKER coresys.resolution.unhealthy = UnhealthyReason.DOCKER
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):
@ -224,11 +224,11 @@ def test_unhealthy_on_report(coresys):
assert event["contexts"]["resolution"]["unhealthy"][-1] == UnhealthyReason.DOCKER assert event["contexts"]["resolution"]["unhealthy"][-1] == UnhealthyReason.DOCKER
def test_images_report(coresys): async def test_images_report(coresys):
"""Attach image to report.""" """Attach image to report."""
coresys.config.diagnostics = True coresys.config.diagnostics = True
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.resolution.evaluate.cached_images.add("my/test:image") coresys.resolution.evaluate.cached_images.add("my/test:image")
with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))):

View File

@ -7,7 +7,7 @@ from supervisor.const import CoreState
async def test_simple_task(coresys): async def test_simple_task(coresys):
"""Schedule a simple task.""" """Schedule a simple task."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
trigger = [] trigger = []
async def test_task(): async def test_task():
@ -22,7 +22,7 @@ async def test_simple_task(coresys):
async def test_simple_task_repeat(coresys): async def test_simple_task_repeat(coresys):
"""Schedule a simple task and repeat.""" """Schedule a simple task and repeat."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
trigger = [] trigger = []
async def test_task(): async def test_task():
@ -41,7 +41,7 @@ async def test_simple_task_repeat(coresys):
async def test_simple_task_shutdown(coresys): async def test_simple_task_shutdown(coresys):
"""Schedule a simple task with shudown.""" """Schedule a simple task with shudown."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
trigger = [] trigger = []
async def test_task(): async def test_task():
@ -62,7 +62,7 @@ async def test_simple_task_shutdown(coresys):
async def test_simple_task_repeat_block(coresys): async def test_simple_task_repeat_block(coresys):
"""Schedule a simple task with repeat and block.""" """Schedule a simple task with repeat and block."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
trigger = [] trigger = []
async def test_task(): async def test_task():

View File

@ -177,7 +177,7 @@ async def test_reload_updater_triggers_supervisor_update(
): ):
"""Test an updater reload triggers a supervisor update if there is one.""" """Test an updater reload triggers a supervisor update if there is one."""
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False coresys.security.content_trust = False
version_data = load_fixture("version_stable.json") version_data = load_fixture("version_stable.json")
@ -218,7 +218,7 @@ async def test_core_backup_cleanup(
tasks: Tasks, coresys: CoreSys, tmp_supervisor_data: Path tasks: Tasks, coresys: CoreSys, tmp_supervisor_data: Path
): ):
"""Test core backup task cleans up old backup files.""" """Test core backup task cleans up old backup files."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.hardware.disk.get_disk_free_space = lambda x: 5000
# Put an old and new backup in folder # Put an old and new backup in folder

View File

@ -334,7 +334,7 @@ async def test_multiple_datadisk_add_remove_signals(
] ]
await coresys.os.datadisk.load() await coresys.os.datadisk.load()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert coresys.resolution.issues == [] assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == [] assert coresys.resolution.suggestions == []
@ -386,7 +386,7 @@ async def test_disabled_datadisk_add_remove_signals(
] ]
await coresys.os.datadisk.load() await coresys.os.datadisk.load()
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert coresys.resolution.issues == [] assert coresys.resolution.issues == []
assert coresys.resolution.suggestions == [] assert coresys.resolution.suggestions == []

View File

@ -67,7 +67,7 @@ def test_ota_url_os_name_rel_5_downgrade(coresys: CoreSys) -> None:
async def test_update_fails_if_out_of_date(coresys: CoreSys) -> None: async def test_update_fails_if_out_of_date(coresys: CoreSys) -> None:
"""Test update of OS fails if Supervisor is out of date.""" """Test update of OS fails if Supervisor is out of date."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with ( with (
patch.object( patch.object(
type(coresys.supervisor), "need_update", new=PropertyMock(return_value=True) type(coresys.supervisor), "need_update", new=PropertyMock(return_value=True)

View File

@ -31,7 +31,7 @@ def fixture_mock_dns_query():
async def test_check_setup(coresys: CoreSys): async def test_check_setup(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
with patch( with patch(
"supervisor.resolution.checks.free_space.CheckFreeSpace.run_check", "supervisor.resolution.checks.free_space.CheckFreeSpace.run_check",
return_value=False, return_value=False,
@ -42,7 +42,7 @@ async def test_check_setup(coresys: CoreSys):
async def test_check_running(coresys: CoreSys): async def test_check_running(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch( with patch(
"supervisor.resolution.checks.free_space.CheckFreeSpace.run_check", "supervisor.resolution.checks.free_space.CheckFreeSpace.run_check",
return_value=False, return_value=False,
@ -54,7 +54,7 @@ async def test_check_running(coresys: CoreSys):
async def test_if_check_make_issue(coresys: CoreSys): async def test_if_check_make_issue(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM) free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)): with patch("shutil.disk_usage", return_value=(1, 1, 1)):
@ -66,7 +66,7 @@ async def test_if_check_make_issue(coresys: CoreSys):
async def test_if_check_cleanup_issue(coresys: CoreSys): async def test_if_check_cleanup_issue(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM) free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.security.content_trust = False coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)): with patch("shutil.disk_usage", return_value=(1, 1, 1)):
@ -82,7 +82,7 @@ async def test_if_check_cleanup_issue(coresys: CoreSys):
async def test_enable_disable_checks(coresys: CoreSys): async def test_enable_disable_checks(coresys: CoreSys):
"""Test enable and disable check.""" """Test enable and disable check."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
free_space = coresys.resolution.check.get("free_space") free_space = coresys.resolution.check.get("free_space")
# Ensure the check was enabled # Ensure the check was enabled

View File

@ -29,7 +29,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys): async def test_check(coresys: CoreSys):
"""Test check.""" """Test check."""
addon_pwned = CheckAddonPwned(coresys) addon_pwned = CheckAddonPwned(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
addon = TestAddon() addon = TestAddon()
coresys.addons.local[addon.slug] = addon coresys.addons.local[addon.slug] = addon
@ -61,7 +61,7 @@ async def test_check(coresys: CoreSys):
async def test_approve(coresys: CoreSys): async def test_approve(coresys: CoreSys):
"""Test check.""" """Test check."""
addon_pwned = CheckAddonPwned(coresys) addon_pwned = CheckAddonPwned(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
addon = TestAddon() addon = TestAddon()
coresys.addons.local[addon.slug] = addon coresys.addons.local[addon.slug] = addon
@ -82,7 +82,7 @@ async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled.""" """Test when pwned is globally disabled."""
coresys.security.pwned = False coresys.security.pwned = False
addon_pwned = CheckAddonPwned(coresys) addon_pwned = CheckAddonPwned(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
addon = TestAddon() addon = TestAddon()
coresys.addons.local[addon.slug] = addon coresys.addons.local[addon.slug] = addon
@ -107,13 +107,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await addon_pwned() await addon_pwned()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await addon_pwned() await addon_pwned()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -22,7 +22,7 @@ async def test_base(coresys: CoreSys):
async def test_check_no_backups(coresys: CoreSys): async def test_check_no_backups(coresys: CoreSys):
"""Test check creates issue with no backups.""" """Test check creates issue with no backups."""
backups = CheckBackups(coresys) backups = CheckBackups(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
await backups.run_check() await backups.run_check()
@ -35,7 +35,7 @@ async def test_check_only_partial_backups(
): ):
"""Test check creates issue with only partial backups.""" """Test check creates issue with only partial backups."""
backups = CheckBackups(coresys) backups = CheckBackups(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
await backups.run_check() await backups.run_check()
@ -46,7 +46,7 @@ async def test_check_only_partial_backups(
async def test_check_with_backup(coresys: CoreSys, mock_full_backup: Backup): async def test_check_with_backup(coresys: CoreSys, mock_full_backup: Backup):
"""Test check only creates issue if full backup not current.""" """Test check only creates issue if full backup not current."""
backups = CheckBackups(coresys) backups = CheckBackups(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
await backups.run_check() await backups.run_check()
@ -72,13 +72,13 @@ async def test_did_run(coresys: CoreSys):
return_value=False, return_value=False,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await backups() await backups()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await backups() await backups()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -23,7 +23,7 @@ async def test_check(coresys: CoreSys, tmp_path):
"""Test check.""" """Test check."""
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path): with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
core_security = CheckCoreSecurity(coresys) core_security = CheckCoreSecurity(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
@ -57,7 +57,7 @@ async def test_approve(coresys: CoreSys, tmp_path):
"""Test check.""" """Test check."""
with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path): with patch("supervisor.config.CoreConfig.path_homeassistant", tmp_path):
core_security = CheckCoreSecurity(coresys) core_security = CheckCoreSecurity(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.homeassistant._data["version"] = None coresys.homeassistant._data["version"] = None
assert await core_security.approve_check() assert await core_security.approve_check()
@ -84,13 +84,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await core_security() await core_security()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await core_security() await core_security()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -21,7 +21,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, install_addon_ssh: Addon): async def test_check(coresys: CoreSys, install_addon_ssh: Addon):
"""Test check for detached addons.""" """Test check for detached addons."""
detached_addon_missing = CheckDetachedAddonMissing(coresys) detached_addon_missing = CheckDetachedAddonMissing(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
await detached_addon_missing() await detached_addon_missing()
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
@ -44,7 +44,7 @@ async def test_check(coresys: CoreSys, install_addon_ssh: Addon):
async def test_approve(coresys: CoreSys, install_addon_ssh: Addon): async def test_approve(coresys: CoreSys, install_addon_ssh: Addon):
"""Test approve existing detached addon issues.""" """Test approve existing detached addon issues."""
detached_addon_missing = CheckDetachedAddonMissing(coresys) detached_addon_missing = CheckDetachedAddonMissing(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert ( assert (
await detached_addon_missing.approve_check(reference=install_addon_ssh.slug) await detached_addon_missing.approve_check(reference=install_addon_ssh.slug)
@ -75,13 +75,13 @@ async def test_did_run(coresys: CoreSys):
CheckDetachedAddonMissing, "run_check", return_value=None CheckDetachedAddonMissing, "run_check", return_value=None
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await detached_addon_missing() await detached_addon_missing()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await detached_addon_missing() await detached_addon_missing()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -25,7 +25,7 @@ async def test_check(
): ):
"""Test check for detached addons.""" """Test check for detached addons."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys) detached_addon_removed = CheckDetachedAddonRemoved(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
await detached_addon_removed() await detached_addon_removed()
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
@ -55,7 +55,7 @@ async def test_approve(
): ):
"""Test approve existing detached addon issues.""" """Test approve existing detached addon issues."""
detached_addon_removed = CheckDetachedAddonRemoved(coresys) detached_addon_removed = CheckDetachedAddonRemoved(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert ( assert (
await detached_addon_removed.approve_check(reference=install_addon_ssh.slug) await detached_addon_removed.approve_check(reference=install_addon_ssh.slug)
@ -86,13 +86,13 @@ async def test_did_run(coresys: CoreSys):
CheckDetachedAddonRemoved, "run_check", return_value=None CheckDetachedAddonRemoved, "run_check", return_value=None
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await detached_addon_removed() await detached_addon_removed()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await detached_addon_removed() await detached_addon_removed()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -36,7 +36,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, sda1_block_service: BlockService): async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
"""Test check.""" """Test check."""
disabled_data_disk = CheckDisabledDataDisk(coresys) disabled_data_disk = CheckDisabledDataDisk(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await disabled_data_disk.run_check() await disabled_data_disk.run_check()
@ -64,7 +64,7 @@ async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
async def test_approve(coresys: CoreSys, sda1_block_service: BlockService): async def test_approve(coresys: CoreSys, sda1_block_service: BlockService):
"""Test approve.""" """Test approve."""
disabled_data_disk = CheckDisabledDataDisk(coresys) disabled_data_disk = CheckDisabledDataDisk(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert not await disabled_data_disk.approve_check(reference="/dev/sda1") assert not await disabled_data_disk.approve_check(reference="/dev/sda1")
@ -88,13 +88,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await disabled_data_disk() await disabled_data_disk()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await disabled_data_disk() await disabled_data_disk()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -31,7 +31,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception: Mock): async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception: Mock):
"""Test check for DNS server failures.""" """Test check for DNS server failures."""
dns_server = CheckDNSServer(coresys) dns_server = CheckDNSServer(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.plugins.dns.servers = ["dns://1.1.1.1"] coresys.plugins.dns.servers = ["dns://1.1.1.1"]
assert dns_server.dns_servers == [ assert dns_server.dns_servers == [
@ -65,7 +65,7 @@ async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception:
async def test_approve(coresys: CoreSys, dns_query: AsyncMock): async def test_approve(coresys: CoreSys, dns_query: AsyncMock):
"""Test approve existing DNS Server failure issues.""" """Test approve existing DNS Server failure issues."""
dns_server = CheckDNSServer(coresys) dns_server = CheckDNSServer(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert dns_server.dns_servers == ["dns://192.168.30.1"] assert dns_server.dns_servers == ["dns://192.168.30.1"]
dns_query.side_effect = DNSError() dns_query.side_effect = DNSError()
@ -92,13 +92,13 @@ async def test_did_run(coresys: CoreSys):
with patch.object(CheckDNSServer, "run_check", return_value=None) as check: with patch.object(CheckDNSServer, "run_check", return_value=None) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await dns_server() await dns_server()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await dns_server() await dns_server()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()
@ -107,7 +107,7 @@ async def test_did_run(coresys: CoreSys):
async def test_check_if_affected(coresys: CoreSys): async def test_check_if_affected(coresys: CoreSys):
"""Test that check is still executed even if already affected.""" """Test that check is still executed even if already affected."""
dns_server = CheckDNSServer(coresys) dns_server = CheckDNSServer(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.resolution.create_issue( coresys.resolution.create_issue(
IssueType.DNS_SERVER_FAILED, IssueType.DNS_SERVER_FAILED,

View File

@ -31,7 +31,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception: Mock): async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception: Mock):
"""Test check for DNS server IPv6 errors.""" """Test check for DNS server IPv6 errors."""
dns_server_ipv6 = CheckDNSServerIPv6(coresys) dns_server_ipv6 = CheckDNSServerIPv6(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.plugins.dns.servers = ["dns://1.1.1.1"] coresys.plugins.dns.servers = ["dns://1.1.1.1"]
assert dns_server_ipv6.dns_servers == [ assert dns_server_ipv6.dns_servers == [
@ -71,7 +71,7 @@ async def test_check(coresys: CoreSys, dns_query: AsyncMock, capture_exception:
async def test_approve(coresys: CoreSys, dns_query: AsyncMock): async def test_approve(coresys: CoreSys, dns_query: AsyncMock):
"""Test approve existing DNS Server IPv6 error issues.""" """Test approve existing DNS Server IPv6 error issues."""
dns_server_ipv6 = CheckDNSServerIPv6(coresys) dns_server_ipv6 = CheckDNSServerIPv6(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert dns_server_ipv6.dns_servers == ["dns://192.168.30.1"] assert dns_server_ipv6.dns_servers == ["dns://192.168.30.1"]
dns_query.side_effect = DNSError(4, "Domain name not found") dns_query.side_effect = DNSError(4, "Domain name not found")
@ -103,13 +103,13 @@ async def test_did_run(coresys: CoreSys):
with patch.object(CheckDNSServerIPv6, "run_check", return_value=None) as check: with patch.object(CheckDNSServerIPv6, "run_check", return_value=None) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await dns_server_ipv6() await dns_server_ipv6()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await dns_server_ipv6() await dns_server_ipv6()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()
@ -118,7 +118,7 @@ async def test_did_run(coresys: CoreSys):
async def test_check_if_affected(coresys: CoreSys): async def test_check_if_affected(coresys: CoreSys):
"""Test that check is still executed even if already affected.""" """Test that check is still executed even if already affected."""
dns_server_ipv6 = CheckDNSServerIPv6(coresys) dns_server_ipv6 = CheckDNSServerIPv6(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.resolution.create_issue( coresys.resolution.create_issue(
IssueType.DNS_SERVER_IPV6_ERROR, IssueType.DNS_SERVER_IPV6_ERROR,

View File

@ -58,7 +58,7 @@ async def test_check(
await coresys.addons.load() await coresys.addons.load()
docker_config = CheckDockerConfig(coresys) docker_config = CheckDockerConfig(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert not coresys.resolution.issues assert not coresys.resolution.issues
assert not coresys.resolution.suggestions assert not coresys.resolution.suggestions
@ -131,13 +131,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await docker_config() await docker_config()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await docker_config() await docker_config()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -45,7 +45,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, suggestion: SuggestionType | None): async def test_check(coresys: CoreSys, suggestion: SuggestionType | None):
"""Test check.""" """Test check."""
free_space = CheckFreeSpace(coresys) free_space = CheckFreeSpace(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
@ -68,7 +68,7 @@ async def test_check(coresys: CoreSys, suggestion: SuggestionType | None):
async def test_approve(coresys: CoreSys): async def test_approve(coresys: CoreSys):
"""Test check.""" """Test check."""
free_space = CheckFreeSpace(coresys) free_space = CheckFreeSpace(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch("shutil.disk_usage", return_value=(1, 1, 1)): with patch("shutil.disk_usage", return_value=(1, 1, 1)):
assert await free_space.approve_check() assert await free_space.approve_check()
@ -90,13 +90,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await free_space() await free_space()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await free_space() await free_space()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -36,7 +36,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys, sda1_block_service: BlockService): async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
"""Test check.""" """Test check."""
multiple_data_disks = CheckMultipleDataDisks(coresys) multiple_data_disks = CheckMultipleDataDisks(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await multiple_data_disks.run_check() await multiple_data_disks.run_check()
@ -64,7 +64,7 @@ async def test_check(coresys: CoreSys, sda1_block_service: BlockService):
async def test_approve(coresys: CoreSys, sda1_block_service: BlockService): async def test_approve(coresys: CoreSys, sda1_block_service: BlockService):
"""Test approve.""" """Test approve."""
multiple_data_disks = CheckMultipleDataDisks(coresys) multiple_data_disks = CheckMultipleDataDisks(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert not await multiple_data_disks.approve_check(reference="/dev/sda1") assert not await multiple_data_disks.approve_check(reference="/dev/sda1")
@ -88,13 +88,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await multiple_data_disks() await multiple_data_disks()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await multiple_data_disks() await multiple_data_disks()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -36,7 +36,7 @@ async def test_check(
): ):
"""Test check.""" """Test check."""
network_interface = CheckNetworkInterfaceIPV4(coresys) network_interface = CheckNetworkInterfaceIPV4(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
@ -65,7 +65,7 @@ async def test_approve(
): ):
"""Test check.""" """Test check."""
network_interface = CheckNetworkInterfaceIPV4(coresys) network_interface = CheckNetworkInterfaceIPV4(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert not await network_interface.approve_check("eth0") assert not await network_interface.approve_check("eth0")
@ -89,13 +89,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await network_interface() await network_interface()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await network_interface() await network_interface()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -20,7 +20,7 @@ async def test_base(coresys: CoreSys):
async def test_check(coresys: CoreSys): async def test_check(coresys: CoreSys):
"""Test check.""" """Test check."""
supervisor_trust = CheckSupervisorTrust(coresys) supervisor_trust = CheckSupervisorTrust(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
@ -47,7 +47,7 @@ async def test_check(coresys: CoreSys):
async def test_approve(coresys: CoreSys): async def test_approve(coresys: CoreSys):
"""Test check.""" """Test check."""
supervisor_trust = CheckSupervisorTrust(coresys) supervisor_trust = CheckSupervisorTrust(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await supervisor_trust.approve_check() assert await supervisor_trust.approve_check()
@ -60,7 +60,7 @@ async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled.""" """Test when pwned is globally disabled."""
coresys.security.content_trust = False coresys.security.content_trust = False
supervisor_trust = CheckSupervisorTrust(coresys) supervisor_trust = CheckSupervisorTrust(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert len(coresys.resolution.issues) == 0 assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
@ -84,13 +84,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as check: ) as check:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await supervisor_trust() await supervisor_trust()
check.assert_called_once() check.assert_called_once()
check.reset_mock() check.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await supervisor_trust() await supervisor_trust()
check.assert_not_called() check.assert_not_called()
check.reset_mock() check.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.connectivity_check import (
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
connectivity_check = EvaluateConnectivityCheck(coresys) connectivity_check = EvaluateConnectivityCheck(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert connectivity_check.reason not in coresys.resolution.unsupported assert connectivity_check.reason not in coresys.resolution.unsupported
@ -46,13 +46,13 @@ async def test_did_run(coresys: CoreSys):
return_value=False, return_value=False,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await connectivity_check() await connectivity_check()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await connectivity_check() await connectivity_check()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.apparmor import EvaluateAppArmor
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
apparmor = EvaluateAppArmor(coresys) apparmor = EvaluateAppArmor(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert apparmor.reason not in coresys.resolution.unsupported assert apparmor.reason not in coresys.resolution.unsupported
@ -42,13 +42,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await apparmor() await apparmor()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await apparmor() await apparmor()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()
@ -57,7 +57,7 @@ async def test_did_run(coresys: CoreSys):
async def test_evaluation_error(coresys: CoreSys): async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation.""" """Test error reading file during evaluation."""
apparmor = EvaluateAppArmor(coresys) apparmor = EvaluateAppArmor(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert apparmor.reason not in coresys.resolution.unsupported assert apparmor.reason not in coresys.resolution.unsupported

View File

@ -15,7 +15,7 @@ from supervisor.resolution.evaluations.cgroup import (
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
cgroup_version = EvaluateCGroupVersion(coresys) cgroup_version = EvaluateCGroupVersion(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert cgroup_version.reason not in coresys.resolution.unsupported assert cgroup_version.reason not in coresys.resolution.unsupported
@ -37,7 +37,7 @@ async def test_evaluation(coresys: CoreSys):
async def test_evaluation_os_available(coresys: CoreSys, os_available): async def test_evaluation_os_available(coresys: CoreSys, os_available):
"""Test evaluation with OS available.""" """Test evaluation with OS available."""
cgroup_version = EvaluateCGroupVersion(coresys) cgroup_version = EvaluateCGroupVersion(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
coresys.docker.info.cgroup = CGROUP_V2_VERSION coresys.docker.info.cgroup = CGROUP_V2_VERSION
await cgroup_version() await cgroup_version()
@ -61,13 +61,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await cgroup_version() await cgroup_version()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await cgroup_version() await cgroup_version()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -25,7 +25,7 @@ def _make_image_attr(image: str) -> MagicMock:
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
container = EvaluateContainer(coresys) container = EvaluateContainer(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert container.reason not in coresys.resolution.unsupported assert container.reason not in coresys.resolution.unsupported
assert UnhealthyReason.DOCKER not in coresys.resolution.unhealthy assert UnhealthyReason.DOCKER not in coresys.resolution.unhealthy
@ -57,7 +57,7 @@ async def test_evaluation(coresys: CoreSys):
async def test_corrupt_docker(coresys: CoreSys): async def test_corrupt_docker(coresys: CoreSys):
"""Test corrupt docker issue.""" """Test corrupt docker issue."""
container = EvaluateContainer(coresys) container = EvaluateContainer(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
corrupt_docker = Issue(IssueType.CORRUPT_DOCKER, ContextType.SYSTEM) corrupt_docker = Issue(IssueType.CORRUPT_DOCKER, ContextType.SYSTEM)
assert corrupt_docker not in coresys.resolution.issues assert corrupt_docker not in coresys.resolution.issues
@ -80,13 +80,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await container() await container()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await container() await container()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
job_conditions = EvaluateContentTrust(coresys) job_conditions = EvaluateContentTrust(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
await job_conditions() await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported assert job_conditions.reason not in coresys.resolution.unsupported
@ -34,13 +34,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await job_conditions() await job_conditions()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await job_conditions() await job_conditions()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.dbus import EvaluateDbus
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
dbus = EvaluateDbus(coresys) dbus = EvaluateDbus(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert dbus.reason not in coresys.resolution.unsupported assert dbus.reason not in coresys.resolution.unsupported
@ -37,13 +37,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await dbus() await dbus()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await dbus() await dbus()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.dns_server import EvaluateDNSServer
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
dns_server = EvaluateDNSServer(coresys) dns_server = EvaluateDNSServer(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert dns_server.reason not in coresys.resolution.unsupported assert dns_server.reason not in coresys.resolution.unsupported
assert coresys.plugins.dns.fallback is True assert coresys.plugins.dns.fallback is True
@ -48,13 +48,13 @@ async def test_did_run(coresys: CoreSys):
with patch.object(EvaluateDNSServer, "evaluate", return_value=None) as evaluate: with patch.object(EvaluateDNSServer, "evaluate", return_value=None) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await dns_server() await dns_server()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await dns_server() await dns_server()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -15,7 +15,7 @@ from supervisor.resolution.evaluations.docker_configuration import (
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
docker_configuration = EvaluateDockerConfiguration(coresys) docker_configuration = EvaluateDockerConfiguration(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert docker_configuration.reason not in coresys.resolution.unsupported assert docker_configuration.reason not in coresys.resolution.unsupported
@ -50,13 +50,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await docker_configuration() await docker_configuration()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await docker_configuration() await docker_configuration()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.docker_version import EvaluateDockerVersi
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
docker_version = EvaluateDockerVersion(coresys) docker_version = EvaluateDockerVersion(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert docker_version.reason not in coresys.resolution.unsupported assert docker_version.reason not in coresys.resolution.unsupported
@ -37,13 +37,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await docker_version() await docker_version()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await docker_version() await docker_version()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.job_conditions import EvaluateJobConditio
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
job_conditions = EvaluateJobConditions(coresys) job_conditions = EvaluateJobConditions(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
await job_conditions() await job_conditions()
assert job_conditions.reason not in coresys.resolution.unsupported assert job_conditions.reason not in coresys.resolution.unsupported
@ -35,13 +35,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await job_conditions() await job_conditions()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await job_conditions() await job_conditions()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.lxc import EvaluateLxc
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
lxc = EvaluateLxc(coresys) lxc = EvaluateLxc(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert lxc.reason not in coresys.resolution.unsupported assert lxc.reason not in coresys.resolution.unsupported
@ -40,13 +40,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await lxc() await lxc()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await lxc() await lxc()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.network_manager import EvaluateNetworkMan
async def test_evaluation(coresys: CoreSys, dbus_is_connected): async def test_evaluation(coresys: CoreSys, dbus_is_connected):
"""Test evaluation.""" """Test evaluation."""
network_manager = EvaluateNetworkManager(coresys) network_manager = EvaluateNetworkManager(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert network_manager.reason not in coresys.resolution.unsupported assert network_manager.reason not in coresys.resolution.unsupported
@ -39,13 +39,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await network_manager() await network_manager()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await network_manager() await network_manager()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -14,7 +14,7 @@ from supervisor.resolution.evaluations.operating_system import (
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
operating_system = EvaluateOperatingSystem(coresys) operating_system = EvaluateOperatingSystem(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert operating_system.reason not in coresys.resolution.unsupported assert operating_system.reason not in coresys.resolution.unsupported
@ -45,13 +45,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await operating_system() await operating_system()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await operating_system() await operating_system()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.os_agent import EvaluateOSAgent
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
agent = EvaluateOSAgent(coresys) agent = EvaluateOSAgent(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert agent.reason not in coresys.resolution.unsupported assert agent.reason not in coresys.resolution.unsupported
@ -42,13 +42,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await agent() await agent()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await agent() await agent()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.privileged import EvaluatePrivileged
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
privileged = EvaluatePrivileged(coresys) privileged = EvaluatePrivileged(coresys)
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
assert privileged.reason not in coresys.resolution.unsupported assert privileged.reason not in coresys.resolution.unsupported
@ -37,13 +37,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await privileged() await privileged()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await privileged() await privileged()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -10,7 +10,7 @@ from supervisor.resolution.evaluations.resolved import EvaluateResolved
async def test_evaluation(coresys: CoreSys, dbus_is_connected): async def test_evaluation(coresys: CoreSys, dbus_is_connected):
"""Test evaluation.""" """Test evaluation."""
resolved = EvaluateResolved(coresys) resolved = EvaluateResolved(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert resolved.reason not in coresys.resolution.unsupported assert resolved.reason not in coresys.resolution.unsupported
@ -36,13 +36,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await resolved() await resolved()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await resolved() await resolved()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -21,7 +21,7 @@ async def test_evaluation(coresys: CoreSys):
Path(f"{os.getcwd()}/supervisor"), Path(f"{os.getcwd()}/supervisor"),
): ):
sourcemods = EvaluateSourceMods(coresys) sourcemods = EvaluateSourceMods(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert sourcemods.reason not in coresys.resolution.unsupported assert sourcemods.reason not in coresys.resolution.unsupported
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
@ -50,13 +50,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await sourcemods() await sourcemods()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await sourcemods() await sourcemods()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()
@ -65,7 +65,7 @@ async def test_did_run(coresys: CoreSys):
async def test_evaluation_error(coresys: CoreSys): async def test_evaluation_error(coresys: CoreSys):
"""Test error reading file during evaluation.""" """Test error reading file during evaluation."""
sourcemods = EvaluateSourceMods(coresys) sourcemods = EvaluateSourceMods(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM) corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM)
assert sourcemods.reason not in coresys.resolution.unsupported assert sourcemods.reason not in coresys.resolution.unsupported

View File

@ -14,7 +14,7 @@ async def test_evaluation(coresys: CoreSys):
need_update_mock = PropertyMock() need_update_mock = PropertyMock()
with patch.object(type(coresys.supervisor), "need_update", new=need_update_mock): with patch.object(type(coresys.supervisor), "need_update", new=need_update_mock):
supervisor_version = EvaluateSupervisorVersion(coresys) supervisor_version = EvaluateSupervisorVersion(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
need_update_mock.return_value = False need_update_mock.return_value = False
# Only unsupported if out of date and auto update is off # Only unsupported if out of date and auto update is off
@ -41,13 +41,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await supervisor_version() await supervisor_version()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await supervisor_version() await supervisor_version()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.evaluations.systemd import EvaluateSystemd
async def test_evaluation(coresys: CoreSys): async def test_evaluation(coresys: CoreSys):
"""Test evaluation.""" """Test evaluation."""
systemd = EvaluateSystemd(coresys) systemd = EvaluateSystemd(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert systemd.reason not in coresys.resolution.unsupported assert systemd.reason not in coresys.resolution.unsupported
@ -46,13 +46,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await systemd() await systemd()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await systemd() await systemd()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -11,7 +11,7 @@ from supervisor.resolution.evaluations.systemd_journal import EvaluateSystemdJou
async def test_evaluation(coresys: CoreSys, journald_gateway: MagicMock): async def test_evaluation(coresys: CoreSys, journald_gateway: MagicMock):
"""Test evaluation.""" """Test evaluation."""
systemd_journal = EvaluateSystemdJournal(coresys) systemd_journal = EvaluateSystemdJournal(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
assert systemd_journal.reason not in coresys.resolution.unsupported assert systemd_journal.reason not in coresys.resolution.unsupported
@ -38,13 +38,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await systemd_journal() await systemd_journal()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await systemd_journal() await systemd_journal()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -10,7 +10,7 @@ from supervisor.resolution.const import UnsupportedReason
async def test_evaluation_initialize(coresys: CoreSys): async def test_evaluation_initialize(coresys: CoreSys):
"""Test evaluation for initialize.""" """Test evaluation for initialize."""
coresys.core.state = CoreState.INITIALIZE await coresys.core.set_state(CoreState.INITIALIZE)
with ( with (
patch( patch(
"supervisor.resolution.evaluations.dbus.EvaluateDbus.evaluate", "supervisor.resolution.evaluations.dbus.EvaluateDbus.evaluate",
@ -43,7 +43,7 @@ async def test_evaluation_initialize(coresys: CoreSys):
async def test_evaluation_setup(coresys: CoreSys): async def test_evaluation_setup(coresys: CoreSys):
"""Test evaluation for setup.""" """Test evaluation for setup."""
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
with ( with (
patch( patch(
"supervisor.resolution.evaluations.operating_system.EvaluateOperatingSystem.evaluate", "supervisor.resolution.evaluations.operating_system.EvaluateOperatingSystem.evaluate",
@ -66,7 +66,7 @@ async def test_evaluation_setup(coresys: CoreSys):
async def test_evaluation_running(coresys: CoreSys): async def test_evaluation_running(coresys: CoreSys):
"""Test evaluation for running.""" """Test evaluation for running."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with ( with (
patch( patch(
"supervisor.resolution.evaluations.container.EvaluateContainer.evaluate", "supervisor.resolution.evaluations.container.EvaluateContainer.evaluate",
@ -84,7 +84,7 @@ async def test_evaluation_running(coresys: CoreSys):
async def test_adding_and_removing_unsupported_reason(coresys: CoreSys): async def test_adding_and_removing_unsupported_reason(coresys: CoreSys):
"""Test adding and removing unsupported reason.""" """Test adding and removing unsupported reason."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert UnsupportedReason.NETWORK_MANAGER not in coresys.resolution.unsupported assert UnsupportedReason.NETWORK_MANAGER not in coresys.resolution.unsupported
with patch( with patch(

View File

@ -17,7 +17,7 @@ TEST_VERSION = AwesomeVersion("1.0.0")
async def test_evaluation(coresys: CoreSys, install_addon_ssh: Addon): async def test_evaluation(coresys: CoreSys, install_addon_ssh: Addon):
"""Test evaluation.""" """Test evaluation."""
restart_policy = EvaluateRestartPolicy(coresys) restart_policy = EvaluateRestartPolicy(coresys)
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
await restart_policy() await restart_policy()
assert restart_policy.reason not in coresys.resolution.unsupported assert restart_policy.reason not in coresys.resolution.unsupported
@ -69,13 +69,13 @@ async def test_did_run(coresys: CoreSys):
return_value=False, return_value=False,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await restart_policy() await restart_policy()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await restart_policy() await restart_policy()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -21,7 +21,7 @@ async def test_evaluation(
"""Test evaluation.""" """Test evaluation."""
systemd_service: SystemdService = all_dbus_services["systemd"] systemd_service: SystemdService = all_dbus_services["systemd"]
virtualization = EvaluateVirtualizationImage(coresys) virtualization = EvaluateVirtualizationImage(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
with patch( with patch(
"supervisor.os.manager.CPE.get_target_hardware", return_value=["generic-x86-64"] "supervisor.os.manager.CPE.get_target_hardware", return_value=["generic-x86-64"]
@ -53,7 +53,7 @@ async def test_evaluation_supported_images(
"""Test supported images for virtualization do not trigger unsupported.""" """Test supported images for virtualization do not trigger unsupported."""
systemd_service: SystemdService = all_dbus_services["systemd"] systemd_service: SystemdService = all_dbus_services["systemd"]
virtualization = EvaluateVirtualizationImage(coresys) virtualization = EvaluateVirtualizationImage(coresys)
coresys.core.state = CoreState.SETUP await coresys.core.set_state(CoreState.SETUP)
with patch("supervisor.os.manager.CPE.get_target_hardware", return_value=[board]): with patch("supervisor.os.manager.CPE.get_target_hardware", return_value=[board]):
systemd_service.virtualization = "vmware" systemd_service.virtualization = "vmware"
@ -77,13 +77,13 @@ async def test_did_run(coresys: CoreSys):
return_value=None, return_value=None,
) as evaluate: ) as evaluate:
for state in should_run: for state in should_run:
coresys.core.state = state await coresys.core.set_state(state)
await virtualization() await virtualization()
evaluate.assert_called_once() evaluate.assert_called_once()
evaluate.reset_mock() evaluate.reset_mock()
for state in should_not_run: for state in should_not_run:
coresys.core.state = state await coresys.core.set_state(state)
await virtualization() await virtualization()
evaluate.assert_not_called() evaluate.assert_not_called()
evaluate.reset_mock() evaluate.reset_mock()

View File

@ -12,7 +12,7 @@ from supervisor.resolution.validate import get_valid_modules
async def test_check_autofix(coresys: CoreSys): async def test_check_autofix(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
coresys.resolution.fixup._fixups[ coresys.resolution.fixup._fixups[
"system_create_full_backup" "system_create_full_backup"

View File

@ -10,7 +10,7 @@ from supervisor.utils import check_exception_chain
async def test_check_system_error(coresys: CoreSys, capture_exception: Mock): async def test_check_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while checking system.""" """Test error while checking system."""
coresys.core.state = CoreState.STARTUP await coresys.core.set_state(CoreState.STARTUP)
with ( with (
patch.object(CheckCoreSecurity, "run_check", side_effect=ValueError), patch.object(CheckCoreSecurity, "run_check", side_effect=ValueError),

View File

@ -9,7 +9,7 @@ from supervisor.utils import check_exception_chain
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock): async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
"""Test error while evaluating system.""" """Test error while evaluating system."""
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
with patch( with patch(
"supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode", "supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode",

View File

@ -5,7 +5,7 @@ import datetime
import errno import errno
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
from pytest import LogCaptureFixture import pytest
from supervisor.const import CoreState from supervisor.const import CoreState
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
@ -16,15 +16,18 @@ from supervisor.supervisor import Supervisor
from supervisor.utils.whoami import WhoamiData from supervisor.utils.whoami import WhoamiData
def test_write_state(run_supervisor_state, coresys: CoreSys): @pytest.mark.parametrize("run_supervisor_state", ["test_file"], indirect=True)
async def test_write_state(run_supervisor_state: MagicMock, coresys: CoreSys):
"""Test write corestate to /run/supervisor.""" """Test write corestate to /run/supervisor."""
coresys.core.state = CoreState.RUNNING run_supervisor_state.reset_mock()
await coresys.core.set_state(CoreState.RUNNING)
run_supervisor_state.write_text.assert_called_with( run_supervisor_state.write_text.assert_called_with(
str(CoreState.RUNNING), encoding="utf-8" str(CoreState.RUNNING), encoding="utf-8"
) )
coresys.core.state = CoreState.SHUTDOWN await coresys.core.set_state(CoreState.SHUTDOWN)
run_supervisor_state.write_text.assert_called_with( run_supervisor_state.write_text.assert_called_with(
str(CoreState.SHUTDOWN), encoding="utf-8" str(CoreState.SHUTDOWN), encoding="utf-8"
@ -87,14 +90,14 @@ async def test_adjust_system_datetime_if_time_behind(coresys: CoreSys):
mock_check_connectivity.assert_called_once() mock_check_connectivity.assert_called_once()
def test_write_state_failure( async def test_write_state_failure(
run_supervisor_state: MagicMock, coresys: CoreSys, caplog: LogCaptureFixture run_supervisor_state: MagicMock, coresys: CoreSys, caplog: pytest.LogCaptureFixture
): ):
"""Test failure to write corestate to /run/supervisor.""" """Test failure to write corestate to /run/supervisor."""
err = OSError() err = OSError()
err.errno = errno.EBADMSG err.errno = errno.EBADMSG
run_supervisor_state.write_text.side_effect = err run_supervisor_state.write_text.side_effect = err
coresys.core.state = CoreState.RUNNING await coresys.core.set_state(CoreState.RUNNING)
assert "Can't update the Supervisor state" in caplog.text assert "Can't update the Supervisor state" in caplog.text
assert coresys.core.state == CoreState.RUNNING assert coresys.core.state == CoreState.RUNNING