Add freeze/thaw apis for external snapshots (#4538)

* Add freeze/thaw apis for external backups

* Error when thaw called before freeze

* Timeout must be > 0
This commit is contained in:
Mike Degatano 2023-09-09 04:54:19 -04:00 committed by GitHub
parent 0aafda1477
commit 44daffc65b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 577 additions and 213 deletions

View File

@ -771,6 +771,43 @@ class Addon(AddonModel):
_LOGGER.error,
) from err
@Job(name="addon_begin_backup")
async def begin_backup(self) -> bool:
"""Execute pre commands or stop addon if necessary.
Returns value of `is_running`. Caller should not call `end_backup` if return is false.
"""
if not await self.is_running():
return False
if self.backup_mode == AddonBackupMode.COLD:
_LOGGER.info("Shutdown add-on %s for cold backup", self.slug)
try:
await self.instance.stop()
except DockerError as err:
raise AddonsError() from err
elif self.backup_pre is not None:
await self._backup_command(self.backup_pre)
return True
@Job(name="addon_end_backup")
async def end_backup(self) -> Awaitable[None] | None:
"""Execute post commands or restart addon if necessary.
Returns a coroutine that completes when addon has state 'started' (see start)
for cold backup. Else nothing is returned.
"""
if self.backup_mode is AddonBackupMode.COLD:
_LOGGER.info("Starting add-on %s again", self.slug)
return await self.start()
if self.backup_post is not None:
await self._backup_command(self.backup_post)
return None
@Job(name="addon_backup")
async def backup(self, tar_file: tarfile.TarFile) -> Awaitable[None] | None:
"""Backup state of an add-on.
@ -778,7 +815,6 @@ class Addon(AddonModel):
for cold backup. Else nothing is returned.
"""
wait_for_start: Awaitable[None] | None = None
is_running = await self.is_running()
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
temp_path = Path(temp)
@ -830,19 +866,7 @@ class Addon(AddonModel):
arcname="data",
)
if (
is_running
and self.backup_mode == AddonBackupMode.HOT
and self.backup_pre is not None
):
await self._backup_command(self.backup_pre)
elif is_running and self.backup_mode == AddonBackupMode.COLD:
_LOGGER.info("Shutdown add-on %s for cold backup", self.slug)
try:
await self.instance.stop()
except DockerError as err:
raise AddonsError() from err
is_running = await self.begin_backup()
try:
_LOGGER.info("Building backup for add-on %s", self.slug)
await self.sys_run_in_executor(_write_tarfile)
@ -851,15 +875,8 @@ class Addon(AddonModel):
f"Can't write tarfile {tar_file}: {err}", _LOGGER.error
) from err
finally:
if (
is_running
and self.backup_mode == AddonBackupMode.HOT
and self.backup_post is not None
):
await self._backup_command(self.backup_post)
elif is_running and self.backup_mode is AddonBackupMode.COLD:
_LOGGER.info("Starting add-on %s again", self.slug)
wait_for_start = await self.start()
if is_running:
wait_for_start = await self.end_backup()
_LOGGER.info("Finish backup for addon %s", self.slug)
return wait_for_start

View File

@ -485,6 +485,8 @@ class RestAPI(CoreSysAttributes):
web.get("/backups/info", api_backups.info),
web.post("/backups/options", api_backups.options),
web.post("/backups/reload", api_backups.reload),
web.post("/backups/freeze", api_backups.freeze),
web.post("/backups/thaw", api_backups.thaw),
web.post("/backups/new/full", api_backups.backup_full),
web.post("/backups/new/partial", api_backups.backup_partial),
web.post("/backups/new/upload", api_backups.upload),

View File

@ -28,6 +28,7 @@ from ..const import (
ATTR_SIZE,
ATTR_SLUG,
ATTR_SUPERVISOR_VERSION,
ATTR_TIMEOUT,
ATTR_TYPE,
ATTR_VERSION,
)
@ -80,6 +81,12 @@ SCHEMA_OPTIONS = vol.Schema(
}
)
SCHEMA_FREEZE = vol.Schema(
{
vol.Optional(ATTR_TIMEOUT): vol.All(int, vol.Range(min=1)),
}
)
class APIBackups(CoreSysAttributes):
"""Handle RESTful API for backups functions."""
@ -142,7 +149,7 @@ class APIBackups(CoreSysAttributes):
self.sys_backups.save_data()
@api_process
async def reload(self, request):
async def reload(self, _):
"""Reload backup list."""
await asyncio.shield(self.sys_backups.reload())
return True
@ -233,6 +240,17 @@ class APIBackups(CoreSysAttributes):
return await asyncio.shield(self.sys_backups.do_restore_partial(backup, **body))
@api_process
async def freeze(self, request):
"""Initiate manual freeze for external backup."""
body = await api_validate(SCHEMA_FREEZE, request)
await asyncio.shield(self.sys_backups.freeze_all(**body))
@api_process
async def thaw(self, request):
"""Begin thaw after manual freeze."""
await self.sys_backups.thaw_all()
@api_process
async def remove(self, request):
"""Remove a backup."""

View File

@ -2,6 +2,7 @@
from enum import StrEnum
BUF_SIZE = 2**20 * 4 # 4MB
DEFAULT_FREEZE_TIMEOUT = 600
class BackupType(StrEnum):

View File

@ -13,17 +13,18 @@ from ..const import (
FOLDER_HOMEASSISTANT,
CoreState,
)
from ..coresys import CoreSysAttributes
from ..dbus.const import UnitActiveState
from ..exceptions import AddonsError
from ..jobs.decorator import Job, JobCondition
from ..exceptions import AddonsError, BackupError, BackupJobError
from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobCondition, JobExecutionLimit
from ..jobs.decorator import Job
from ..jobs.job_group import JobGroup
from ..mounts.mount import Mount
from ..utils.common import FileConfiguration
from ..utils.dt import utcnow
from ..utils.sentinel import DEFAULT
from ..utils.sentry import capture_exception
from .backup import Backup
from .const import BackupJobStage, BackupType, RestoreJobStage
from .const import DEFAULT_FREEZE_TIMEOUT, BackupJobStage, BackupType, RestoreJobStage
from .utils import create_slug
from .validate import ALL_FOLDERS, SCHEMA_BACKUPS_CONFIG
@ -42,15 +43,16 @@ def _list_backup_files(path: Path) -> Iterable[Path]:
return []
class BackupManager(FileConfiguration, CoreSysAttributes):
class BackupManager(FileConfiguration, JobGroup):
"""Manage backups."""
def __init__(self, coresys):
"""Initialize a backup manager."""
super().__init__(FILE_HASSIO_BACKUPS, SCHEMA_BACKUPS_CONFIG)
self.coresys = coresys
super(FileConfiguration, self).__init__(coresys, JOB_GROUP_BACKUP_MANAGER)
self._backups: dict[str, Backup] = {}
self.lock = asyncio.Lock()
self._thaw_task: Awaitable[None] | None = None
self._thaw_event: asyncio.Event = asyncio.Event()
@property
def list_backups(self) -> set[Backup]:
@ -92,18 +94,27 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
def _change_stage(
self,
backup: Backup,
stage: BackupJobStage | RestoreJobStage,
restore: bool = False,
backup: Backup | None = None,
):
"""Change the stage of the current job during backup/restore.
Must be called from an existing backup/restore job.
"""
job_name = self.sys_jobs.current.name
if "restore" in job_name:
action = "Restore"
elif "freeze" in job_name:
action = "Freeze"
elif "thaw" in job_name:
action = "Thaw"
else:
action = "Backup"
_LOGGER.info(
"%s %s starting stage %s",
"Restore" if restore else "Backup",
backup.slug,
"%s %sstarting stage %s",
action,
f"{backup.slug} " if backup else "",
stage,
)
self.sys_jobs.current.stage = stage
@ -131,9 +142,9 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
# Add backup ID to job
self.sys_jobs.current.reference = backup.slug
self._change_stage(backup, BackupJobStage.ADDON_REPOSITORIES)
self._change_stage(BackupJobStage.ADDON_REPOSITORIES, backup)
backup.store_repositories()
self._change_stage(backup, BackupJobStage.DOCKER_CONFIG)
self._change_stage(BackupJobStage.DOCKER_CONFIG, backup)
backup.store_dockerconfig()
return backup
@ -228,20 +239,20 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
async with backup:
# Backup add-ons
if addon_list:
self._change_stage(backup, BackupJobStage.ADDONS)
self._change_stage(BackupJobStage.ADDONS, backup)
addon_start_tasks = await backup.store_addons(addon_list)
# HomeAssistant Folder is for v1
if homeassistant:
self._change_stage(backup, BackupJobStage.HOME_ASSISTANT)
self._change_stage(BackupJobStage.HOME_ASSISTANT, backup)
await backup.store_homeassistant()
# Backup folders
if folder_list:
self._change_stage(backup, BackupJobStage.FOLDERS)
self._change_stage(BackupJobStage.FOLDERS, backup)
await backup.store_folders(folder_list)
self._change_stage(backup, BackupJobStage.FINISHING_FILE)
self._change_stage(BackupJobStage.FINISHING_FILE, backup)
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Backup %s error", backup.slug)
@ -251,7 +262,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
self._backups[backup.slug] = backup
if addon_start_tasks:
self._change_stage(backup, BackupJobStage.AWAIT_ADDON_RESTARTS)
self._change_stage(BackupJobStage.AWAIT_ADDON_RESTARTS, backup)
# Ignore exceptions from waiting for addon startup, addon errors handled elsewhere
await asyncio.gather(*addon_start_tasks, return_exceptions=True)
@ -262,6 +273,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
@Job(
name="backup_manager_full_backup",
conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
)
async def do_backup_full(
self,
@ -271,26 +284,23 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
location: Mount | type[DEFAULT] | None = DEFAULT,
) -> Backup | None:
"""Create a full backup."""
if self.lock.locked():
_LOGGER.error("A backup/restore process is already running")
return None
backup = self._create_backup(
name, BackupType.FULL, password, compressed, location
)
_LOGGER.info("Creating new full backup with slug %s", backup.slug)
async with self.lock:
backup = await self._do_backup(
backup, self.sys_addons.installed, ALL_FOLDERS, True
)
if backup:
_LOGGER.info("Creating full backup with slug %s completed", backup.slug)
return backup
backup = await self._do_backup(
backup, self.sys_addons.installed, ALL_FOLDERS, True
)
if backup:
_LOGGER.info("Creating full backup with slug %s completed", backup.slug)
return backup
@Job(
name="backup_manager_partial_backup",
conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
)
async def do_backup_partial(
self,
@ -303,10 +313,6 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
location: Mount | type[DEFAULT] | None = DEFAULT,
) -> Backup | None:
"""Create a partial backup."""
if self.lock.locked():
_LOGGER.error("A backup/restore process is already running")
return None
addons = addons or []
folders = folders or []
@ -323,21 +329,18 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
)
_LOGGER.info("Creating new partial backup with slug %s", backup.slug)
async with self.lock:
addon_list = []
for addon_slug in addons:
addon = self.sys_addons.get(addon_slug)
if addon and addon.is_installed:
addon_list.append(addon)
continue
_LOGGER.warning("Add-on %s not found/installed", addon_slug)
addon_list = []
for addon_slug in addons:
addon = self.sys_addons.get(addon_slug)
if addon and addon.is_installed:
addon_list.append(addon)
continue
_LOGGER.warning("Add-on %s not found/installed", addon_slug)
backup = await self._do_backup(backup, addon_list, folders, homeassistant)
if backup:
_LOGGER.info(
"Creating partial backup with slug %s completed", backup.slug
)
return backup
backup = await self._do_backup(backup, addon_list, folders, homeassistant)
if backup:
_LOGGER.info("Creating partial backup with slug %s completed", backup.slug)
return backup
async def _do_restore(
self,
@ -357,26 +360,22 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
task_hass: asyncio.Task | None = None
async with backup:
# Restore docker config
self._change_stage(backup, RestoreJobStage.DOCKER_CONFIG, restore=True)
self._change_stage(RestoreJobStage.DOCKER_CONFIG, backup)
backup.restore_dockerconfig(replace)
# Process folders
if folder_list:
self._change_stage(backup, RestoreJobStage.FOLDERS, restore=True)
self._change_stage(RestoreJobStage.FOLDERS, backup)
await backup.restore_folders(folder_list)
# Process Home-Assistant
if homeassistant:
self._change_stage(
backup, RestoreJobStage.HOME_ASSISTANT, restore=True
)
self._change_stage(RestoreJobStage.HOME_ASSISTANT, backup)
task_hass = await backup.restore_homeassistant()
# Delete delta add-ons
if replace:
self._change_stage(
backup, RestoreJobStage.REMOVE_DELTA_ADDONS, restore=True
)
self._change_stage(RestoreJobStage.REMOVE_DELTA_ADDONS, backup)
for addon in self.sys_addons.installed:
if addon.slug in backup.addon_list:
continue
@ -389,20 +388,16 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
_LOGGER.warning("Can't uninstall Add-on %s", addon.slug)
if addon_list:
self._change_stage(
backup, RestoreJobStage.ADDON_REPOSITORIES, restore=True
)
self._change_stage(RestoreJobStage.ADDON_REPOSITORIES, backup)
await backup.restore_repositories(replace)
self._change_stage(backup, RestoreJobStage.ADDONS, restore=True)
self._change_stage(RestoreJobStage.ADDONS, backup)
addon_start_tasks = await backup.restore_addons(addon_list)
# Wait for Home Assistant Core update/downgrade
if task_hass:
self._change_stage(
backup,
RestoreJobStage.AWAIT_HOME_ASSISTANT_RESTART,
restore=True,
RestoreJobStage.AWAIT_HOME_ASSISTANT_RESTART, backup
)
await task_hass
@ -412,9 +407,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
return False
else:
if addon_start_tasks:
self._change_stage(
backup, RestoreJobStage.AWAIT_ADDON_RESTARTS, restore=True
)
self._change_stage(RestoreJobStage.AWAIT_ADDON_RESTARTS, backup)
# Ignore exceptions from waiting for addon startup, addon errors handled elsewhere
await asyncio.gather(*addon_start_tasks, return_exceptions=True)
@ -422,9 +415,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
finally:
# Leave Home Assistant alone if it wasn't part of the restore
if homeassistant:
self._change_stage(
backup, RestoreJobStage.CHECK_HOME_ASSISTANT, restore=True
)
self._change_stage(RestoreJobStage.CHECK_HOME_ASSISTANT, backup)
# Do we need start Home Assistant Core?
if not await self.sys_homeassistant.core.is_running():
@ -444,6 +435,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
JobCondition.INTERNET_SYSTEM,
JobCondition.RUNNING,
],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
)
async def do_restore_full(
self, backup: Backup, password: str | None = None
@ -452,10 +445,6 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
# Add backup ID to job
self.sys_jobs.current.reference = backup.slug
if self.lock.locked():
_LOGGER.error("A backup/restore process is already running")
return False
if backup.sys_type != BackupType.FULL:
_LOGGER.error("%s is only a partial backup!", backup.slug)
return False
@ -473,21 +462,20 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
return False
_LOGGER.info("Full-Restore %s start", backup.slug)
async with self.lock:
self.sys_core.state = CoreState.FREEZE
self.sys_core.state = CoreState.FREEZE
# Stop Home-Assistant / Add-ons
await self.sys_core.shutdown()
# Stop Home-Assistant / Add-ons
await self.sys_core.shutdown()
success = await self._do_restore(
backup, backup.addon_list, backup.folders, True, True
)
success = await self._do_restore(
backup, backup.addon_list, backup.folders, True, True
)
self.sys_core.state = CoreState.RUNNING
self.sys_core.state = CoreState.RUNNING
if success:
_LOGGER.info("Full-Restore %s done", backup.slug)
return success
if success:
_LOGGER.info("Full-Restore %s done", backup.slug)
return success
@Job(
name="backup_manager_partial_restore",
@ -498,6 +486,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
JobCondition.INTERNET_SYSTEM,
JobCondition.RUNNING,
],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
)
async def do_restore_partial(
self,
@ -511,10 +501,6 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
# Add backup ID to job
self.sys_jobs.current.reference = backup.slug
if self.lock.locked():
_LOGGER.error("A backup/restore process is already running")
return False
addon_list = addons or []
folder_list = folders or []
@ -540,15 +526,100 @@ class BackupManager(FileConfiguration, CoreSysAttributes):
return False
_LOGGER.info("Partial-Restore %s start", backup.slug)
async with self.lock:
self.sys_core.state = CoreState.FREEZE
self.sys_core.state = CoreState.FREEZE
success = await self._do_restore(
backup, addon_list, folder_list, homeassistant, False
success = await self._do_restore(
backup, addon_list, folder_list, homeassistant, False
)
self.sys_core.state = CoreState.RUNNING
if success:
_LOGGER.info("Partial-Restore %s done", backup.slug)
return success
@Job(
name="backup_manager_freeze_all",
conditions=[JobCondition.RUNNING],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
)
async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None:
"""Freeze system to prepare for an external backup such as an image snapshot."""
self.sys_core.state = CoreState.FREEZE
# Determine running addons
installed = self.sys_addons.installed.copy()
is_running: list[bool] = await asyncio.gather(
*[addon.is_running() for addon in installed]
)
running_addons = [
installed[ind] for ind in range(len(installed)) if is_running[ind]
]
# Create thaw task first to ensure we eventually undo freezes even if the below fails
self._thaw_task = asyncio.shield(
self.sys_create_task(self._thaw_all(running_addons, timeout))
)
# Tell Home Assistant to freeze for a backup
self._change_stage(BackupJobStage.HOME_ASSISTANT)
await self.sys_homeassistant.begin_backup()
# Run all pre-backup tasks for addons
self._change_stage(BackupJobStage.ADDONS)
await asyncio.gather(*[addon.begin_backup() for addon in running_addons])
@Job(
name="backup_manager_thaw_all",
conditions=[JobCondition.FROZEN],
on_condition=BackupJobError,
)
async def _thaw_all(
self, running_addons: list[Addon], timeout: float = DEFAULT_FREEZE_TIMEOUT
) -> None:
"""Thaw system after user signal or timeout."""
try:
try:
await asyncio.wait_for(self._thaw_event.wait(), timeout)
except asyncio.TimeoutError:
_LOGGER.warning(
"Timeout waiting for signal to thaw after manual freeze, beginning thaw now"
)
self._change_stage(BackupJobStage.HOME_ASSISTANT)
await self.sys_homeassistant.end_backup()
self._change_stage(BackupJobStage.ADDONS)
addon_start_tasks: list[Awaitable[None]] = [
task
for task in await asyncio.gather(
*[addon.end_backup() for addon in running_addons]
)
if task
]
finally:
self.sys_core.state = CoreState.RUNNING
self._thaw_event.clear()
self._thaw_task = None
if addon_start_tasks:
self._change_stage(BackupJobStage.AWAIT_ADDON_RESTARTS)
await asyncio.gather(*addon_start_tasks, return_exceptions=True)
@Job(
name="backup_manager_signal_thaw",
conditions=[JobCondition.FROZEN],
limit=JobExecutionLimit.GROUP_ONCE,
on_condition=BackupJobError,
internal=True,
)
async def thaw_all(self) -> None:
"""Signal thaw task to begin unfreezing the system."""
if not self._thaw_task:
raise BackupError(
"Freeze was not initiated by freeze API, cannot thaw this way"
)
self.sys_core.state = CoreState.RUNNING
if success:
_LOGGER.info("Partial-Restore %s done", backup.slug)
return success
self._thaw_event.set()
await self._thaw_task

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from contextvars import Context, copy_context
from datetime import datetime
from functools import partial
import logging
@ -99,6 +100,9 @@ class CoreSys:
{aiohttp.hdrs.USER_AGENT: SERVER_SOFTWARE}
)
# Task factory attributes
self._set_task_context: list[Callable[[Context], Context]] = []
@property
def dev(self) -> bool:
"""Return True if we run dev mode."""
@ -520,6 +524,17 @@ class CoreSys:
"""Return now in local timezone."""
return datetime.now(get_time_zone(self.timezone) or UTC)
def add_set_task_context_callback(
self, callback: Callable[[Context], Context]
) -> None:
"""Add callback used to modify context prior to creating a task.
Only used for tasks created via CoreSys.create_task. Callback can modify the provided
context using context.run (ex. `context.run(var.set, "new_value")`). Callback should
return the context to be provided to task.
"""
self._set_task_context.append(callback)
def run_in_executor(
self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any]
) -> Coroutine[Any, Any, T]:
@ -531,7 +546,11 @@ class CoreSys:
def create_task(self, coroutine: Coroutine) -> asyncio.Task:
"""Create an async task."""
return self.loop.create_task(coroutine)
context = copy_context()
for callback in self._set_task_context:
context = callback(context)
return self.loop.create_task(coroutine, context=context)
class CoreSysAttributes:

View File

@ -581,6 +581,10 @@ class HomeAssistantBackupError(BackupError, HomeAssistantError):
"""Raise if an error during Home Assistant Core backup is happening."""
class BackupJobError(BackupError, JobException):
"""Raise on Backup job error."""
# Security

View File

@ -306,62 +306,70 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
self.sys_homeassistant.websocket.send_message({ATTR_TYPE: "usb/scan"})
@Job(name="home_assistant_module_backup")
async def backup(self, tar_file: tarfile.TarFile) -> None:
"""Backup Home Assistant Core config/ directory."""
# Let Home Assistant Core know we are about to backup
@Job(name="home_assistant_module_begin_backup")
async def begin_backup(self) -> None:
"""Inform Home Assistant a backup is beginning."""
try:
await self.websocket.async_send_command({ATTR_TYPE: WSType.BACKUP_START})
except HomeAssistantWSError:
_LOGGER.warning(
"Preparing backup of Home Assistant Core failed. Check HA Core logs."
)
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
temp_path = Path(temp)
@Job(name="home_assistant_module_end_backup")
async def end_backup(self) -> None:
"""Inform Home Assistant the backup is ending."""
try:
await self.websocket.async_send_command({ATTR_TYPE: WSType.BACKUP_END})
except HomeAssistantWSError:
_LOGGER.warning(
"Error during Home Assistant Core backup. Check HA Core logs."
)
# Store local configs/state
try:
write_json_file(temp_path.joinpath("homeassistant.json"), self._data)
except ConfigurationFileError as err:
raise HomeAssistantError(
f"Can't save meta for Home Assistant Core: {err!s}", _LOGGER.error
) from err
@Job(name="home_assistant_module_backup")
async def backup(self, tar_file: tarfile.TarFile) -> None:
"""Backup Home Assistant Core config/ directory."""
await self.begin_backup()
try:
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
temp_path = Path(temp)
# Backup data config folder
def _write_tarfile():
with tar_file as backup:
# Backup metadata
backup.add(temp, arcname=".")
# Backup data
atomic_contents_add(
backup,
self.sys_config.path_homeassistant,
excludes=HOMEASSISTANT_BACKUP_EXCLUDE,
arcname="data",
)
try:
_LOGGER.info("Backing up Home Assistant Core config folder")
await self.sys_run_in_executor(_write_tarfile)
_LOGGER.info("Backup Home Assistant Core config folder done")
except (tarfile.TarError, OSError) as err:
raise HomeAssistantBackupError(
f"Can't backup Home Assistant Core config folder: {str(err)}",
_LOGGER.error,
) from err
finally:
# Store local configs/state
try:
await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: WSType.BACKUP_END}
)
except HomeAssistantWSError:
_LOGGER.warning(
"Error during Home Assistant Core backup. Check HA Core logs."
write_json_file(
temp_path.joinpath("homeassistant.json"), self._data
)
except ConfigurationFileError as err:
raise HomeAssistantError(
f"Can't save meta for Home Assistant Core: {err!s}",
_LOGGER.error,
) from err
# Backup data config folder
def _write_tarfile():
with tar_file as backup:
# Backup metadata
backup.add(temp, arcname=".")
# Backup data
atomic_contents_add(
backup,
self.sys_config.path_homeassistant,
excludes=HOMEASSISTANT_BACKUP_EXCLUDE,
arcname="data",
)
try:
_LOGGER.info("Backing up Home Assistant Core config folder")
await self.sys_run_in_executor(_write_tarfile)
_LOGGER.info("Backup Home Assistant Core config folder done")
except (tarfile.TarError, OSError) as err:
raise HomeAssistantBackupError(
f"Can't backup Home Assistant Core config folder: {str(err)}",
_LOGGER.error,
) from err
finally:
await self.end_backup()
async def restore(self, tar_file: tarfile.TarFile) -> None:
"""Restore Home Assistant Core config/ directory."""

View File

@ -1,7 +1,7 @@
"""Supervisor job manager."""
from collections.abc import Callable
from contextlib import contextmanager
from contextvars import ContextVar, Token
from contextvars import Context, ContextVar, Token
import logging
from typing import Any
from uuid import UUID, uuid4
@ -27,6 +27,12 @@ _CURRENT_JOB: ContextVar[UUID] = ContextVar("current_job")
_LOGGER: logging.Logger = logging.getLogger(__name__)
def _remove_current_job(context: Context) -> Context:
"""Remove the current job from the context."""
context.run(_CURRENT_JOB.set, None)
return context
def _invalid_if_done(instance: "SupervisorJob", *_) -> None:
"""Validate that job is not done."""
if instance.done:
@ -114,6 +120,9 @@ class JobManager(FileConfiguration, CoreSysAttributes):
self.coresys: CoreSys = coresys
self._jobs: dict[str, SupervisorJob] = {}
# Ensure tasks created via CoreSys.create_task do not have a parent
self.coresys.add_set_task_context_callback(_remove_current_job)
@property
def jobs(self) -> list[SupervisorJob]:
"""Return a list of current jobs."""

View File

@ -9,6 +9,7 @@ FILE_CONFIG_JOBS = Path(SUPERVISOR_DATA, "jobs.json")
ATTR_IGNORE_CONDITIONS = "ignore_conditions"
JOB_GROUP_ADDON = "addon_{slug}"
JOB_GROUP_BACKUP_MANAGER = "backup_manager"
JOB_GROUP_DOCKER_INTERFACE = "container_{name}"
JOB_GROUP_HOME_ASSISTANT_CORE = "home_assistant_core"
@ -18,6 +19,7 @@ class JobCondition(StrEnum):
AUTO_UPDATE = "auto_update"
FREE_SPACE = "free_space"
FROZEN = "frozen"
HAOS = "haos"
HEALTHY = "healthy"
HOST_NETWORK = "host_network"

View File

@ -307,6 +307,14 @@ class Job(CoreSysAttributes):
f"'{self._method.__qualname__}' blocked from execution, system is not running - {self.sys_core.state!s}"
)
if (
JobCondition.FROZEN in used_conditions
and self.sys_core.state != CoreState.FREEZE
):
raise JobConditionException(
f"'{self._method.__qualname__}' blocked from execution, system is not frozen - {self.sys_core.state!s}"
)
if (
JobCondition.FREE_SPACE in used_conditions
and self.sys_host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD

View File

@ -1,9 +1,11 @@
"""Test backups API."""
import asyncio
from pathlib import Path, PurePath
from unittest.mock import patch
from unittest.mock import AsyncMock, patch
from aiohttp.test_utils import TestClient
from awesomeversion import AwesomeVersion
import pytest
from supervisor.backups.backup import Backup
@ -135,3 +137,33 @@ async def test_backup_to_default(
slug = result["data"]["slug"]
assert (mount_dir / f"{slug}.tar").exists()
async def test_api_freeze_thaw(
api_client: TestClient,
coresys: CoreSys,
ha_ws_client: AsyncMock,
tmp_supervisor_data,
path_extern,
):
"""Test manual freeze and thaw for external backup via API."""
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
await api_client.post("/backups/freeze")
assert coresys.core.state == CoreState.FREEZE
await asyncio.sleep(0)
assert any(
call.args[0] == {"type": "backup/start"}
for call in ha_ws_client.async_send_command.call_args_list
)
ha_ws_client.async_send_command.reset_mock()
await api_client.post("/backups/thaw")
assert coresys.core.state == CoreState.RUNNING
await asyncio.sleep(0)
assert any(
call.args[0] == {"type": "backup/end"}
for call in ha_ws_client.async_send_command.call_args_list
)

View File

@ -20,7 +20,7 @@ from supervisor.docker.addon import DockerAddon
from supervisor.docker.const import ContainerState
from supervisor.docker.homeassistant import DockerHomeAssistant
from supervisor.docker.monitor import DockerContainerStateEvent
from supervisor.exceptions import AddonsError, DockerError
from supervisor.exceptions import AddonsError, BackupError, DockerError
from supervisor.homeassistant.core import HomeAssistantCore
from supervisor.homeassistant.module import HomeAssistant
from supervisor.mounts.mount import Mount
@ -832,8 +832,7 @@ async def test_restore_with_healthcheck(
def _make_backup_message_for_assert(
*,
full: bool = True,
restore: bool = False,
action: str = "full_backup",
reference: str,
stage: str | None,
done: bool = False,
@ -844,7 +843,7 @@ def _make_backup_message_for_assert(
"data": {
"event": "job",
"data": {
"name": f"backup_manager_{'full' if full else 'partial'}_{'restore' if restore else 'backup'}",
"name": f"backup_manager_{action}",
"reference": reference,
"uuid": ANY,
"progress": 0,
@ -920,27 +919,35 @@ async def test_backup_progress(
== "backup_manager_partial_backup"
]
assert messages == [
_make_backup_message_for_assert(full=False, reference=None, stage=None),
_make_backup_message_for_assert(
full=False, reference=partial_backup.slug, stage=None
action="partial_backup", reference=None, stage=None
),
_make_backup_message_for_assert(
full=False, reference=partial_backup.slug, stage="addon_repositories"
action="partial_backup", reference=partial_backup.slug, stage=None
),
_make_backup_message_for_assert(
full=False, reference=partial_backup.slug, stage="docker_config"
action="partial_backup",
reference=partial_backup.slug,
stage="addon_repositories",
),
_make_backup_message_for_assert(
full=False, reference=partial_backup.slug, stage="addons"
action="partial_backup",
reference=partial_backup.slug,
stage="docker_config",
),
_make_backup_message_for_assert(
full=False, reference=partial_backup.slug, stage="folders"
action="partial_backup", reference=partial_backup.slug, stage="addons"
),
_make_backup_message_for_assert(
full=False, reference=partial_backup.slug, stage="finishing_file"
action="partial_backup", reference=partial_backup.slug, stage="folders"
),
_make_backup_message_for_assert(
full=False,
action="partial_backup",
reference=partial_backup.slug,
stage="finishing_file",
),
_make_backup_message_for_assert(
action="partial_backup",
reference=partial_backup.slug,
stage="finishing_file",
done=True,
@ -987,51 +994,53 @@ async def test_restore_progress(
== "backup_manager_full_restore"
]
assert messages == [
_make_backup_message_for_assert(restore=True, reference=None, stage=None),
_make_backup_message_for_assert(
restore=True, reference=full_backup.slug, stage=None
action="full_restore", reference=None, stage=None
),
_make_backup_message_for_assert(
restore=True, reference=full_backup.slug, stage="docker_config"
action="full_restore", reference=full_backup.slug, stage=None
),
_make_backup_message_for_assert(
restore=True, reference=full_backup.slug, stage="folders"
action="full_restore", reference=full_backup.slug, stage="docker_config"
),
_make_backup_message_for_assert(
restore=True,
action="full_restore", reference=full_backup.slug, stage="folders"
),
_make_backup_message_for_assert(
action="full_restore",
reference=full_backup.slug,
stage="home_assistant",
),
_make_backup_message_for_assert(
restore=True,
action="full_restore",
reference=full_backup.slug,
stage="remove_delta_addons",
),
_make_backup_message_for_assert(
restore=True,
action="full_restore",
reference=full_backup.slug,
stage="addon_repositories",
),
_make_backup_message_for_assert(
restore=True, reference=full_backup.slug, stage="addons"
action="full_restore", reference=full_backup.slug, stage="addons"
),
_make_backup_message_for_assert(
restore=True,
action="full_restore",
reference=full_backup.slug,
stage="await_home_assistant_restart",
),
_make_backup_message_for_assert(
restore=True,
action="full_restore",
reference=full_backup.slug,
stage="await_addon_restarts",
),
_make_backup_message_for_assert(
restore=True,
action="full_restore",
reference=full_backup.slug,
stage="check_home_assistant",
),
_make_backup_message_for_assert(
restore=True,
action="full_restore",
reference=full_backup.slug,
stage="check_home_assistant",
done=True,
@ -1055,29 +1064,25 @@ async def test_restore_progress(
]
assert messages == [
_make_backup_message_for_assert(
full=False, restore=True, reference=None, stage=None
action="partial_restore", reference=None, stage=None
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=folders_backup.slug,
stage=None,
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=folders_backup.slug,
stage="docker_config",
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=folders_backup.slug,
stage="folders",
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=folders_backup.slug,
stage="folders",
done=True,
@ -1103,37 +1108,151 @@ async def test_restore_progress(
]
assert messages == [
_make_backup_message_for_assert(
full=False, restore=True, reference=None, stage=None
action="partial_restore", reference=None, stage=None
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=addon_backup.slug,
stage=None,
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=addon_backup.slug,
stage="docker_config",
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=addon_backup.slug,
stage="addon_repositories",
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=addon_backup.slug,
stage="addons",
),
_make_backup_message_for_assert(
full=False,
restore=True,
action="partial_restore",
reference=addon_backup.slug,
stage="addons",
done=True,
),
]
async def test_freeze_thaw(
coresys: CoreSys,
install_addon_ssh: Addon,
container: MagicMock,
ha_ws_client: AsyncMock,
tmp_supervisor_data,
path_extern,
):
"""Test manual freeze and thaw for external snapshots."""
container.status = "running"
install_addon_ssh.path_data.mkdir()
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
container.exec_run.return_value = (0, None)
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
with patch.object(
AddonModel, "backup_pre", new=PropertyMock(return_value="pre_backup")
), patch.object(
AddonModel, "backup_post", new=PropertyMock(return_value="post_backup")
):
# Run the freeze
await coresys.backups.freeze_all()
container.exec_run.assert_called_once_with("pre_backup")
assert coresys.core.state == CoreState.FREEZE
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] in ["backup/start", "backup/end"]
or call.args[0]["data"].get("data", {}).get("name")
in ["backup_manager_freeze_all", "backup_manager_thaw_all"]
]
assert messages == [
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage=None
),
{"type": "backup/start"},
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage="home_assistant"
),
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage="addons"
),
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage=None
),
_make_backup_message_for_assert(
action="freeze_all", reference=None, stage="addons", done=True
),
]
# Release the thaw task
container.exec_run.reset_mock()
ha_ws_client.async_send_command.reset_mock()
await coresys.backups.thaw_all()
container.exec_run.assert_called_once_with("post_backup")
assert coresys.core.state == CoreState.RUNNING
await asyncio.sleep(0)
messages = [
call.args[0]
for call in ha_ws_client.async_send_command.call_args_list
if call.args[0]["type"] in ["backup/start", "backup/end"]
or call.args[0]["data"].get("data", {}).get("name")
in ["backup_manager_freeze_all", "backup_manager_thaw_all"]
]
assert messages == [
{"type": "backup/end"},
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage="home_assistant"
),
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage="addons"
),
_make_backup_message_for_assert(
action="thaw_all", reference=None, stage="addons", done=True
),
]
async def test_freeze_thaw_timeout(
coresys: CoreSys,
ha_ws_client: AsyncMock,
caplog: pytest.LogCaptureFixture,
tmp_supervisor_data,
path_extern,
):
"""Test manual freeze ends due to timeout expiration."""
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
ha_ws_client.ha_version = AwesomeVersion("2022.1.0")
await coresys.backups.freeze_all(timeout=0.01)
assert coresys.core.state == CoreState.FREEZE
await asyncio.sleep(0)
assert any(
call.args[0] == {"type": "backup/start"}
for call in ha_ws_client.async_send_command.call_args_list
)
ha_ws_client.async_send_command.reset_mock()
await asyncio.sleep(0.02)
assert coresys.core.state == CoreState.RUNNING
assert any(
call.args[0] == {"type": "backup/end"}
for call in ha_ws_client.async_send_command.call_args_list
)
assert "Timeout waiting for signal to thaw after manual freeze" in caplog.text
async def test_cannot_manually_thaw_normal_freeze(coresys: CoreSys):
"""Test thaw_all cannot be used unless freeze was started by freeze_all method."""
coresys.core.state = CoreState.FREEZE
with pytest.raises(BackupError):
await coresys.backups.thaw_all()

View File

@ -990,3 +990,57 @@ async def test_internal_jobs_no_notify(coresys: CoreSys):
},
}
)
async def test_job_starting_separate_task(coresys: CoreSys):
"""Test job that starts a job as a separate asyncio task."""
class TestClass(JobGroup):
"""Test class."""
def __init__(self, coresys: CoreSys) -> None:
super().__init__(coresys, "test_class_locking")
self.event = asyncio.Event()
@Job(
name="test_job_starting_separate_task_job_task",
limit=JobExecutionLimit.GROUP_ONCE,
)
async def job_task(self):
"""Create a separate long running job task."""
self.sys_jobs.current.stage = "launch_task"
return self.sys_create_task(self.job_task_inner())
@Job(name="test_job_starting_separate_task_job_task_inner")
async def job_task_inner(self):
"""Check & update job and wait for release."""
assert self.sys_jobs.current.parent_id is None
self.sys_jobs.current.stage = "start"
await self.event.wait()
self.sys_jobs.current.stage = "end"
@Job(name="test_job_starting_separate_task_release")
async def job_release(self):
"""Release inner task."""
self.event.set()
@Job(
name="test_job_starting_separate_task_job_await",
limit=JobExecutionLimit.GROUP_ONCE,
)
async def job_await(self):
"""Await a simple job in same group to confirm lock released."""
await self.job_await_inner()
@Job(name="test_job_starting_separate_task_job_await_inner")
async def job_await_inner(self):
"""Confirm there is a parent this way."""
assert self.sys_jobs.current.parent_id is not None
test = TestClass(coresys)
task = await test.job_task()
await asyncio.sleep(0)
await test.job_await()
await test.job_release()
await task