From 44daffc65b009014de3fde7ff99c517656a13bd6 Mon Sep 17 00:00:00 2001 From: Mike Degatano Date: Sat, 9 Sep 2023 04:54:19 -0400 Subject: [PATCH] Add freeze/thaw apis for external snapshots (#4538) * Add freeze/thaw apis for external backups * Error when thaw called before freeze * Timeout must be > 0 --- supervisor/addons/addon.py | 63 ++++--- supervisor/api/__init__.py | 2 + supervisor/api/backups.py | 20 ++- supervisor/backups/const.py | 1 + supervisor/backups/manager.py | 265 ++++++++++++++++++----------- supervisor/coresys.py | 21 ++- supervisor/exceptions.py | 4 + supervisor/homeassistant/module.py | 98 ++++++----- supervisor/jobs/__init__.py | 11 +- supervisor/jobs/const.py | 2 + supervisor/jobs/decorator.py | 8 + tests/api/test_backups.py | 34 +++- tests/backups/test_manager.py | 207 +++++++++++++++++----- tests/jobs/test_job_decorator.py | 54 ++++++ 14 files changed, 577 insertions(+), 213 deletions(-) diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index 8a0065d18..82312d57b 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -771,6 +771,43 @@ class Addon(AddonModel): _LOGGER.error, ) from err + @Job(name="addon_begin_backup") + async def begin_backup(self) -> bool: + """Execute pre commands or stop addon if necessary. + + Returns value of `is_running`. Caller should not call `end_backup` if return is false. + """ + if not await self.is_running(): + return False + + if self.backup_mode == AddonBackupMode.COLD: + _LOGGER.info("Shutdown add-on %s for cold backup", self.slug) + try: + await self.instance.stop() + except DockerError as err: + raise AddonsError() from err + + elif self.backup_pre is not None: + await self._backup_command(self.backup_pre) + + return True + + @Job(name="addon_end_backup") + async def end_backup(self) -> Awaitable[None] | None: + """Execute post commands or restart addon if necessary. + + Returns a coroutine that completes when addon has state 'started' (see start) + for cold backup. Else nothing is returned. + """ + if self.backup_mode is AddonBackupMode.COLD: + _LOGGER.info("Starting add-on %s again", self.slug) + return await self.start() + + if self.backup_post is not None: + await self._backup_command(self.backup_post) + return None + + @Job(name="addon_backup") async def backup(self, tar_file: tarfile.TarFile) -> Awaitable[None] | None: """Backup state of an add-on. @@ -778,7 +815,6 @@ class Addon(AddonModel): for cold backup. Else nothing is returned. """ wait_for_start: Awaitable[None] | None = None - is_running = await self.is_running() with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: temp_path = Path(temp) @@ -830,19 +866,7 @@ class Addon(AddonModel): arcname="data", ) - if ( - is_running - and self.backup_mode == AddonBackupMode.HOT - and self.backup_pre is not None - ): - await self._backup_command(self.backup_pre) - elif is_running and self.backup_mode == AddonBackupMode.COLD: - _LOGGER.info("Shutdown add-on %s for cold backup", self.slug) - try: - await self.instance.stop() - except DockerError as err: - raise AddonsError() from err - + is_running = await self.begin_backup() try: _LOGGER.info("Building backup for add-on %s", self.slug) await self.sys_run_in_executor(_write_tarfile) @@ -851,15 +875,8 @@ class Addon(AddonModel): f"Can't write tarfile {tar_file}: {err}", _LOGGER.error ) from err finally: - if ( - is_running - and self.backup_mode == AddonBackupMode.HOT - and self.backup_post is not None - ): - await self._backup_command(self.backup_post) - elif is_running and self.backup_mode is AddonBackupMode.COLD: - _LOGGER.info("Starting add-on %s again", self.slug) - wait_for_start = await self.start() + if is_running: + wait_for_start = await self.end_backup() _LOGGER.info("Finish backup for addon %s", self.slug) return wait_for_start diff --git a/supervisor/api/__init__.py b/supervisor/api/__init__.py index a80e5ba37..46942e2fa 100644 --- a/supervisor/api/__init__.py +++ b/supervisor/api/__init__.py @@ -485,6 +485,8 @@ class RestAPI(CoreSysAttributes): web.get("/backups/info", api_backups.info), web.post("/backups/options", api_backups.options), web.post("/backups/reload", api_backups.reload), + web.post("/backups/freeze", api_backups.freeze), + web.post("/backups/thaw", api_backups.thaw), web.post("/backups/new/full", api_backups.backup_full), web.post("/backups/new/partial", api_backups.backup_partial), web.post("/backups/new/upload", api_backups.upload), diff --git a/supervisor/api/backups.py b/supervisor/api/backups.py index b4334ee09..93f266875 100644 --- a/supervisor/api/backups.py +++ b/supervisor/api/backups.py @@ -28,6 +28,7 @@ from ..const import ( ATTR_SIZE, ATTR_SLUG, ATTR_SUPERVISOR_VERSION, + ATTR_TIMEOUT, ATTR_TYPE, ATTR_VERSION, ) @@ -80,6 +81,12 @@ SCHEMA_OPTIONS = vol.Schema( } ) +SCHEMA_FREEZE = vol.Schema( + { + vol.Optional(ATTR_TIMEOUT): vol.All(int, vol.Range(min=1)), + } +) + class APIBackups(CoreSysAttributes): """Handle RESTful API for backups functions.""" @@ -142,7 +149,7 @@ class APIBackups(CoreSysAttributes): self.sys_backups.save_data() @api_process - async def reload(self, request): + async def reload(self, _): """Reload backup list.""" await asyncio.shield(self.sys_backups.reload()) return True @@ -233,6 +240,17 @@ class APIBackups(CoreSysAttributes): return await asyncio.shield(self.sys_backups.do_restore_partial(backup, **body)) + @api_process + async def freeze(self, request): + """Initiate manual freeze for external backup.""" + body = await api_validate(SCHEMA_FREEZE, request) + await asyncio.shield(self.sys_backups.freeze_all(**body)) + + @api_process + async def thaw(self, request): + """Begin thaw after manual freeze.""" + await self.sys_backups.thaw_all() + @api_process async def remove(self, request): """Remove a backup.""" diff --git a/supervisor/backups/const.py b/supervisor/backups/const.py index c4b5e593e..085b212e2 100644 --- a/supervisor/backups/const.py +++ b/supervisor/backups/const.py @@ -2,6 +2,7 @@ from enum import StrEnum BUF_SIZE = 2**20 * 4 # 4MB +DEFAULT_FREEZE_TIMEOUT = 600 class BackupType(StrEnum): diff --git a/supervisor/backups/manager.py b/supervisor/backups/manager.py index e49405791..4b2976b94 100644 --- a/supervisor/backups/manager.py +++ b/supervisor/backups/manager.py @@ -13,17 +13,18 @@ from ..const import ( FOLDER_HOMEASSISTANT, CoreState, ) -from ..coresys import CoreSysAttributes from ..dbus.const import UnitActiveState -from ..exceptions import AddonsError -from ..jobs.decorator import Job, JobCondition +from ..exceptions import AddonsError, BackupError, BackupJobError +from ..jobs.const import JOB_GROUP_BACKUP_MANAGER, JobCondition, JobExecutionLimit +from ..jobs.decorator import Job +from ..jobs.job_group import JobGroup from ..mounts.mount import Mount from ..utils.common import FileConfiguration from ..utils.dt import utcnow from ..utils.sentinel import DEFAULT from ..utils.sentry import capture_exception from .backup import Backup -from .const import BackupJobStage, BackupType, RestoreJobStage +from .const import DEFAULT_FREEZE_TIMEOUT, BackupJobStage, BackupType, RestoreJobStage from .utils import create_slug from .validate import ALL_FOLDERS, SCHEMA_BACKUPS_CONFIG @@ -42,15 +43,16 @@ def _list_backup_files(path: Path) -> Iterable[Path]: return [] -class BackupManager(FileConfiguration, CoreSysAttributes): +class BackupManager(FileConfiguration, JobGroup): """Manage backups.""" def __init__(self, coresys): """Initialize a backup manager.""" super().__init__(FILE_HASSIO_BACKUPS, SCHEMA_BACKUPS_CONFIG) - self.coresys = coresys + super(FileConfiguration, self).__init__(coresys, JOB_GROUP_BACKUP_MANAGER) self._backups: dict[str, Backup] = {} - self.lock = asyncio.Lock() + self._thaw_task: Awaitable[None] | None = None + self._thaw_event: asyncio.Event = asyncio.Event() @property def list_backups(self) -> set[Backup]: @@ -92,18 +94,27 @@ class BackupManager(FileConfiguration, CoreSysAttributes): def _change_stage( self, - backup: Backup, stage: BackupJobStage | RestoreJobStage, - restore: bool = False, + backup: Backup | None = None, ): """Change the stage of the current job during backup/restore. Must be called from an existing backup/restore job. """ + job_name = self.sys_jobs.current.name + if "restore" in job_name: + action = "Restore" + elif "freeze" in job_name: + action = "Freeze" + elif "thaw" in job_name: + action = "Thaw" + else: + action = "Backup" + _LOGGER.info( - "%s %s starting stage %s", - "Restore" if restore else "Backup", - backup.slug, + "%s %sstarting stage %s", + action, + f"{backup.slug} " if backup else "", stage, ) self.sys_jobs.current.stage = stage @@ -131,9 +142,9 @@ class BackupManager(FileConfiguration, CoreSysAttributes): # Add backup ID to job self.sys_jobs.current.reference = backup.slug - self._change_stage(backup, BackupJobStage.ADDON_REPOSITORIES) + self._change_stage(BackupJobStage.ADDON_REPOSITORIES, backup) backup.store_repositories() - self._change_stage(backup, BackupJobStage.DOCKER_CONFIG) + self._change_stage(BackupJobStage.DOCKER_CONFIG, backup) backup.store_dockerconfig() return backup @@ -228,20 +239,20 @@ class BackupManager(FileConfiguration, CoreSysAttributes): async with backup: # Backup add-ons if addon_list: - self._change_stage(backup, BackupJobStage.ADDONS) + self._change_stage(BackupJobStage.ADDONS, backup) addon_start_tasks = await backup.store_addons(addon_list) # HomeAssistant Folder is for v1 if homeassistant: - self._change_stage(backup, BackupJobStage.HOME_ASSISTANT) + self._change_stage(BackupJobStage.HOME_ASSISTANT, backup) await backup.store_homeassistant() # Backup folders if folder_list: - self._change_stage(backup, BackupJobStage.FOLDERS) + self._change_stage(BackupJobStage.FOLDERS, backup) await backup.store_folders(folder_list) - self._change_stage(backup, BackupJobStage.FINISHING_FILE) + self._change_stage(BackupJobStage.FINISHING_FILE, backup) except Exception as err: # pylint: disable=broad-except _LOGGER.exception("Backup %s error", backup.slug) @@ -251,7 +262,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes): self._backups[backup.slug] = backup if addon_start_tasks: - self._change_stage(backup, BackupJobStage.AWAIT_ADDON_RESTARTS) + self._change_stage(BackupJobStage.AWAIT_ADDON_RESTARTS, backup) # Ignore exceptions from waiting for addon startup, addon errors handled elsewhere await asyncio.gather(*addon_start_tasks, return_exceptions=True) @@ -262,6 +273,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes): @Job( name="backup_manager_full_backup", conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING], + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=BackupJobError, ) async def do_backup_full( self, @@ -271,26 +284,23 @@ class BackupManager(FileConfiguration, CoreSysAttributes): location: Mount | type[DEFAULT] | None = DEFAULT, ) -> Backup | None: """Create a full backup.""" - if self.lock.locked(): - _LOGGER.error("A backup/restore process is already running") - return None - backup = self._create_backup( name, BackupType.FULL, password, compressed, location ) _LOGGER.info("Creating new full backup with slug %s", backup.slug) - async with self.lock: - backup = await self._do_backup( - backup, self.sys_addons.installed, ALL_FOLDERS, True - ) - if backup: - _LOGGER.info("Creating full backup with slug %s completed", backup.slug) - return backup + backup = await self._do_backup( + backup, self.sys_addons.installed, ALL_FOLDERS, True + ) + if backup: + _LOGGER.info("Creating full backup with slug %s completed", backup.slug) + return backup @Job( name="backup_manager_partial_backup", conditions=[JobCondition.FREE_SPACE, JobCondition.RUNNING], + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=BackupJobError, ) async def do_backup_partial( self, @@ -303,10 +313,6 @@ class BackupManager(FileConfiguration, CoreSysAttributes): location: Mount | type[DEFAULT] | None = DEFAULT, ) -> Backup | None: """Create a partial backup.""" - if self.lock.locked(): - _LOGGER.error("A backup/restore process is already running") - return None - addons = addons or [] folders = folders or [] @@ -323,21 +329,18 @@ class BackupManager(FileConfiguration, CoreSysAttributes): ) _LOGGER.info("Creating new partial backup with slug %s", backup.slug) - async with self.lock: - addon_list = [] - for addon_slug in addons: - addon = self.sys_addons.get(addon_slug) - if addon and addon.is_installed: - addon_list.append(addon) - continue - _LOGGER.warning("Add-on %s not found/installed", addon_slug) + addon_list = [] + for addon_slug in addons: + addon = self.sys_addons.get(addon_slug) + if addon and addon.is_installed: + addon_list.append(addon) + continue + _LOGGER.warning("Add-on %s not found/installed", addon_slug) - backup = await self._do_backup(backup, addon_list, folders, homeassistant) - if backup: - _LOGGER.info( - "Creating partial backup with slug %s completed", backup.slug - ) - return backup + backup = await self._do_backup(backup, addon_list, folders, homeassistant) + if backup: + _LOGGER.info("Creating partial backup with slug %s completed", backup.slug) + return backup async def _do_restore( self, @@ -357,26 +360,22 @@ class BackupManager(FileConfiguration, CoreSysAttributes): task_hass: asyncio.Task | None = None async with backup: # Restore docker config - self._change_stage(backup, RestoreJobStage.DOCKER_CONFIG, restore=True) + self._change_stage(RestoreJobStage.DOCKER_CONFIG, backup) backup.restore_dockerconfig(replace) # Process folders if folder_list: - self._change_stage(backup, RestoreJobStage.FOLDERS, restore=True) + self._change_stage(RestoreJobStage.FOLDERS, backup) await backup.restore_folders(folder_list) # Process Home-Assistant if homeassistant: - self._change_stage( - backup, RestoreJobStage.HOME_ASSISTANT, restore=True - ) + self._change_stage(RestoreJobStage.HOME_ASSISTANT, backup) task_hass = await backup.restore_homeassistant() # Delete delta add-ons if replace: - self._change_stage( - backup, RestoreJobStage.REMOVE_DELTA_ADDONS, restore=True - ) + self._change_stage(RestoreJobStage.REMOVE_DELTA_ADDONS, backup) for addon in self.sys_addons.installed: if addon.slug in backup.addon_list: continue @@ -389,20 +388,16 @@ class BackupManager(FileConfiguration, CoreSysAttributes): _LOGGER.warning("Can't uninstall Add-on %s", addon.slug) if addon_list: - self._change_stage( - backup, RestoreJobStage.ADDON_REPOSITORIES, restore=True - ) + self._change_stage(RestoreJobStage.ADDON_REPOSITORIES, backup) await backup.restore_repositories(replace) - self._change_stage(backup, RestoreJobStage.ADDONS, restore=True) + self._change_stage(RestoreJobStage.ADDONS, backup) addon_start_tasks = await backup.restore_addons(addon_list) # Wait for Home Assistant Core update/downgrade if task_hass: self._change_stage( - backup, - RestoreJobStage.AWAIT_HOME_ASSISTANT_RESTART, - restore=True, + RestoreJobStage.AWAIT_HOME_ASSISTANT_RESTART, backup ) await task_hass @@ -412,9 +407,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes): return False else: if addon_start_tasks: - self._change_stage( - backup, RestoreJobStage.AWAIT_ADDON_RESTARTS, restore=True - ) + self._change_stage(RestoreJobStage.AWAIT_ADDON_RESTARTS, backup) # Ignore exceptions from waiting for addon startup, addon errors handled elsewhere await asyncio.gather(*addon_start_tasks, return_exceptions=True) @@ -422,9 +415,7 @@ class BackupManager(FileConfiguration, CoreSysAttributes): finally: # Leave Home Assistant alone if it wasn't part of the restore if homeassistant: - self._change_stage( - backup, RestoreJobStage.CHECK_HOME_ASSISTANT, restore=True - ) + self._change_stage(RestoreJobStage.CHECK_HOME_ASSISTANT, backup) # Do we need start Home Assistant Core? if not await self.sys_homeassistant.core.is_running(): @@ -444,6 +435,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes): JobCondition.INTERNET_SYSTEM, JobCondition.RUNNING, ], + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=BackupJobError, ) async def do_restore_full( self, backup: Backup, password: str | None = None @@ -452,10 +445,6 @@ class BackupManager(FileConfiguration, CoreSysAttributes): # Add backup ID to job self.sys_jobs.current.reference = backup.slug - if self.lock.locked(): - _LOGGER.error("A backup/restore process is already running") - return False - if backup.sys_type != BackupType.FULL: _LOGGER.error("%s is only a partial backup!", backup.slug) return False @@ -473,21 +462,20 @@ class BackupManager(FileConfiguration, CoreSysAttributes): return False _LOGGER.info("Full-Restore %s start", backup.slug) - async with self.lock: - self.sys_core.state = CoreState.FREEZE + self.sys_core.state = CoreState.FREEZE - # Stop Home-Assistant / Add-ons - await self.sys_core.shutdown() + # Stop Home-Assistant / Add-ons + await self.sys_core.shutdown() - success = await self._do_restore( - backup, backup.addon_list, backup.folders, True, True - ) + success = await self._do_restore( + backup, backup.addon_list, backup.folders, True, True + ) - self.sys_core.state = CoreState.RUNNING + self.sys_core.state = CoreState.RUNNING - if success: - _LOGGER.info("Full-Restore %s done", backup.slug) - return success + if success: + _LOGGER.info("Full-Restore %s done", backup.slug) + return success @Job( name="backup_manager_partial_restore", @@ -498,6 +486,8 @@ class BackupManager(FileConfiguration, CoreSysAttributes): JobCondition.INTERNET_SYSTEM, JobCondition.RUNNING, ], + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=BackupJobError, ) async def do_restore_partial( self, @@ -511,10 +501,6 @@ class BackupManager(FileConfiguration, CoreSysAttributes): # Add backup ID to job self.sys_jobs.current.reference = backup.slug - if self.lock.locked(): - _LOGGER.error("A backup/restore process is already running") - return False - addon_list = addons or [] folder_list = folders or [] @@ -540,15 +526,100 @@ class BackupManager(FileConfiguration, CoreSysAttributes): return False _LOGGER.info("Partial-Restore %s start", backup.slug) - async with self.lock: - self.sys_core.state = CoreState.FREEZE + self.sys_core.state = CoreState.FREEZE - success = await self._do_restore( - backup, addon_list, folder_list, homeassistant, False + success = await self._do_restore( + backup, addon_list, folder_list, homeassistant, False + ) + + self.sys_core.state = CoreState.RUNNING + + if success: + _LOGGER.info("Partial-Restore %s done", backup.slug) + return success + + @Job( + name="backup_manager_freeze_all", + conditions=[JobCondition.RUNNING], + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=BackupJobError, + ) + async def freeze_all(self, timeout: float = DEFAULT_FREEZE_TIMEOUT) -> None: + """Freeze system to prepare for an external backup such as an image snapshot.""" + self.sys_core.state = CoreState.FREEZE + + # Determine running addons + installed = self.sys_addons.installed.copy() + is_running: list[bool] = await asyncio.gather( + *[addon.is_running() for addon in installed] + ) + running_addons = [ + installed[ind] for ind in range(len(installed)) if is_running[ind] + ] + + # Create thaw task first to ensure we eventually undo freezes even if the below fails + self._thaw_task = asyncio.shield( + self.sys_create_task(self._thaw_all(running_addons, timeout)) + ) + + # Tell Home Assistant to freeze for a backup + self._change_stage(BackupJobStage.HOME_ASSISTANT) + await self.sys_homeassistant.begin_backup() + + # Run all pre-backup tasks for addons + self._change_stage(BackupJobStage.ADDONS) + await asyncio.gather(*[addon.begin_backup() for addon in running_addons]) + + @Job( + name="backup_manager_thaw_all", + conditions=[JobCondition.FROZEN], + on_condition=BackupJobError, + ) + async def _thaw_all( + self, running_addons: list[Addon], timeout: float = DEFAULT_FREEZE_TIMEOUT + ) -> None: + """Thaw system after user signal or timeout.""" + try: + try: + await asyncio.wait_for(self._thaw_event.wait(), timeout) + except asyncio.TimeoutError: + _LOGGER.warning( + "Timeout waiting for signal to thaw after manual freeze, beginning thaw now" + ) + + self._change_stage(BackupJobStage.HOME_ASSISTANT) + await self.sys_homeassistant.end_backup() + + self._change_stage(BackupJobStage.ADDONS) + addon_start_tasks: list[Awaitable[None]] = [ + task + for task in await asyncio.gather( + *[addon.end_backup() for addon in running_addons] + ) + if task + ] + finally: + self.sys_core.state = CoreState.RUNNING + self._thaw_event.clear() + self._thaw_task = None + + if addon_start_tasks: + self._change_stage(BackupJobStage.AWAIT_ADDON_RESTARTS) + await asyncio.gather(*addon_start_tasks, return_exceptions=True) + + @Job( + name="backup_manager_signal_thaw", + conditions=[JobCondition.FROZEN], + limit=JobExecutionLimit.GROUP_ONCE, + on_condition=BackupJobError, + internal=True, + ) + async def thaw_all(self) -> None: + """Signal thaw task to begin unfreezing the system.""" + if not self._thaw_task: + raise BackupError( + "Freeze was not initiated by freeze API, cannot thaw this way" ) - self.sys_core.state = CoreState.RUNNING - - if success: - _LOGGER.info("Partial-Restore %s done", backup.slug) - return success + self._thaw_event.set() + await self._thaw_task diff --git a/supervisor/coresys.py b/supervisor/coresys.py index 079299e90..82c775040 100644 --- a/supervisor/coresys.py +++ b/supervisor/coresys.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Coroutine +from contextvars import Context, copy_context from datetime import datetime from functools import partial import logging @@ -99,6 +100,9 @@ class CoreSys: {aiohttp.hdrs.USER_AGENT: SERVER_SOFTWARE} ) + # Task factory attributes + self._set_task_context: list[Callable[[Context], Context]] = [] + @property def dev(self) -> bool: """Return True if we run dev mode.""" @@ -520,6 +524,17 @@ class CoreSys: """Return now in local timezone.""" return datetime.now(get_time_zone(self.timezone) or UTC) + def add_set_task_context_callback( + self, callback: Callable[[Context], Context] + ) -> None: + """Add callback used to modify context prior to creating a task. + + Only used for tasks created via CoreSys.create_task. Callback can modify the provided + context using context.run (ex. `context.run(var.set, "new_value")`). Callback should + return the context to be provided to task. + """ + self._set_task_context.append(callback) + def run_in_executor( self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any] ) -> Coroutine[Any, Any, T]: @@ -531,7 +546,11 @@ class CoreSys: def create_task(self, coroutine: Coroutine) -> asyncio.Task: """Create an async task.""" - return self.loop.create_task(coroutine) + context = copy_context() + for callback in self._set_task_context: + context = callback(context) + + return self.loop.create_task(coroutine, context=context) class CoreSysAttributes: diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 29fff5437..0b44f63bf 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -581,6 +581,10 @@ class HomeAssistantBackupError(BackupError, HomeAssistantError): """Raise if an error during Home Assistant Core backup is happening.""" +class BackupJobError(BackupError, JobException): + """Raise on Backup job error.""" + + # Security diff --git a/supervisor/homeassistant/module.py b/supervisor/homeassistant/module.py index 4b03ca212..df9bcd4e9 100644 --- a/supervisor/homeassistant/module.py +++ b/supervisor/homeassistant/module.py @@ -306,62 +306,70 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes): self.sys_homeassistant.websocket.send_message({ATTR_TYPE: "usb/scan"}) - @Job(name="home_assistant_module_backup") - async def backup(self, tar_file: tarfile.TarFile) -> None: - """Backup Home Assistant Core config/ directory.""" - - # Let Home Assistant Core know we are about to backup + @Job(name="home_assistant_module_begin_backup") + async def begin_backup(self) -> None: + """Inform Home Assistant a backup is beginning.""" try: await self.websocket.async_send_command({ATTR_TYPE: WSType.BACKUP_START}) - except HomeAssistantWSError: _LOGGER.warning( "Preparing backup of Home Assistant Core failed. Check HA Core logs." ) - with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: - temp_path = Path(temp) + @Job(name="home_assistant_module_end_backup") + async def end_backup(self) -> None: + """Inform Home Assistant the backup is ending.""" + try: + await self.websocket.async_send_command({ATTR_TYPE: WSType.BACKUP_END}) + except HomeAssistantWSError: + _LOGGER.warning( + "Error during Home Assistant Core backup. Check HA Core logs." + ) - # Store local configs/state - try: - write_json_file(temp_path.joinpath("homeassistant.json"), self._data) - except ConfigurationFileError as err: - raise HomeAssistantError( - f"Can't save meta for Home Assistant Core: {err!s}", _LOGGER.error - ) from err + @Job(name="home_assistant_module_backup") + async def backup(self, tar_file: tarfile.TarFile) -> None: + """Backup Home Assistant Core config/ directory.""" + await self.begin_backup() + try: + with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: + temp_path = Path(temp) - # Backup data config folder - def _write_tarfile(): - with tar_file as backup: - # Backup metadata - backup.add(temp, arcname=".") - - # Backup data - atomic_contents_add( - backup, - self.sys_config.path_homeassistant, - excludes=HOMEASSISTANT_BACKUP_EXCLUDE, - arcname="data", - ) - - try: - _LOGGER.info("Backing up Home Assistant Core config folder") - await self.sys_run_in_executor(_write_tarfile) - _LOGGER.info("Backup Home Assistant Core config folder done") - except (tarfile.TarError, OSError) as err: - raise HomeAssistantBackupError( - f"Can't backup Home Assistant Core config folder: {str(err)}", - _LOGGER.error, - ) from err - finally: + # Store local configs/state try: - await self.sys_homeassistant.websocket.async_send_command( - {ATTR_TYPE: WSType.BACKUP_END} - ) - except HomeAssistantWSError: - _LOGGER.warning( - "Error during Home Assistant Core backup. Check HA Core logs." + write_json_file( + temp_path.joinpath("homeassistant.json"), self._data ) + except ConfigurationFileError as err: + raise HomeAssistantError( + f"Can't save meta for Home Assistant Core: {err!s}", + _LOGGER.error, + ) from err + + # Backup data config folder + def _write_tarfile(): + with tar_file as backup: + # Backup metadata + backup.add(temp, arcname=".") + + # Backup data + atomic_contents_add( + backup, + self.sys_config.path_homeassistant, + excludes=HOMEASSISTANT_BACKUP_EXCLUDE, + arcname="data", + ) + + try: + _LOGGER.info("Backing up Home Assistant Core config folder") + await self.sys_run_in_executor(_write_tarfile) + _LOGGER.info("Backup Home Assistant Core config folder done") + except (tarfile.TarError, OSError) as err: + raise HomeAssistantBackupError( + f"Can't backup Home Assistant Core config folder: {str(err)}", + _LOGGER.error, + ) from err + finally: + await self.end_backup() async def restore(self, tar_file: tarfile.TarFile) -> None: """Restore Home Assistant Core config/ directory.""" diff --git a/supervisor/jobs/__init__.py b/supervisor/jobs/__init__.py index fb8918e86..b987c1a7d 100644 --- a/supervisor/jobs/__init__.py +++ b/supervisor/jobs/__init__.py @@ -1,7 +1,7 @@ """Supervisor job manager.""" from collections.abc import Callable from contextlib import contextmanager -from contextvars import ContextVar, Token +from contextvars import Context, ContextVar, Token import logging from typing import Any from uuid import UUID, uuid4 @@ -27,6 +27,12 @@ _CURRENT_JOB: ContextVar[UUID] = ContextVar("current_job") _LOGGER: logging.Logger = logging.getLogger(__name__) +def _remove_current_job(context: Context) -> Context: + """Remove the current job from the context.""" + context.run(_CURRENT_JOB.set, None) + return context + + def _invalid_if_done(instance: "SupervisorJob", *_) -> None: """Validate that job is not done.""" if instance.done: @@ -114,6 +120,9 @@ class JobManager(FileConfiguration, CoreSysAttributes): self.coresys: CoreSys = coresys self._jobs: dict[str, SupervisorJob] = {} + # Ensure tasks created via CoreSys.create_task do not have a parent + self.coresys.add_set_task_context_callback(_remove_current_job) + @property def jobs(self) -> list[SupervisorJob]: """Return a list of current jobs.""" diff --git a/supervisor/jobs/const.py b/supervisor/jobs/const.py index 3b1ea0e8b..0528d143e 100644 --- a/supervisor/jobs/const.py +++ b/supervisor/jobs/const.py @@ -9,6 +9,7 @@ FILE_CONFIG_JOBS = Path(SUPERVISOR_DATA, "jobs.json") ATTR_IGNORE_CONDITIONS = "ignore_conditions" JOB_GROUP_ADDON = "addon_{slug}" +JOB_GROUP_BACKUP_MANAGER = "backup_manager" JOB_GROUP_DOCKER_INTERFACE = "container_{name}" JOB_GROUP_HOME_ASSISTANT_CORE = "home_assistant_core" @@ -18,6 +19,7 @@ class JobCondition(StrEnum): AUTO_UPDATE = "auto_update" FREE_SPACE = "free_space" + FROZEN = "frozen" HAOS = "haos" HEALTHY = "healthy" HOST_NETWORK = "host_network" diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index 3999c6015..a946f6560 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -307,6 +307,14 @@ class Job(CoreSysAttributes): f"'{self._method.__qualname__}' blocked from execution, system is not running - {self.sys_core.state!s}" ) + if ( + JobCondition.FROZEN in used_conditions + and self.sys_core.state != CoreState.FREEZE + ): + raise JobConditionException( + f"'{self._method.__qualname__}' blocked from execution, system is not frozen - {self.sys_core.state!s}" + ) + if ( JobCondition.FREE_SPACE in used_conditions and self.sys_host.info.free_space < MINIMUM_FREE_SPACE_THRESHOLD diff --git a/tests/api/test_backups.py b/tests/api/test_backups.py index 2a57f913c..d6003ba6c 100644 --- a/tests/api/test_backups.py +++ b/tests/api/test_backups.py @@ -1,9 +1,11 @@ """Test backups API.""" +import asyncio from pathlib import Path, PurePath -from unittest.mock import patch +from unittest.mock import AsyncMock, patch from aiohttp.test_utils import TestClient +from awesomeversion import AwesomeVersion import pytest from supervisor.backups.backup import Backup @@ -135,3 +137,33 @@ async def test_backup_to_default( slug = result["data"]["slug"] assert (mount_dir / f"{slug}.tar").exists() + + +async def test_api_freeze_thaw( + api_client: TestClient, + coresys: CoreSys, + ha_ws_client: AsyncMock, + tmp_supervisor_data, + path_extern, +): + """Test manual freeze and thaw for external backup via API.""" + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 5000 + ha_ws_client.ha_version = AwesomeVersion("2022.1.0") + + await api_client.post("/backups/freeze") + assert coresys.core.state == CoreState.FREEZE + await asyncio.sleep(0) + assert any( + call.args[0] == {"type": "backup/start"} + for call in ha_ws_client.async_send_command.call_args_list + ) + + ha_ws_client.async_send_command.reset_mock() + await api_client.post("/backups/thaw") + assert coresys.core.state == CoreState.RUNNING + await asyncio.sleep(0) + assert any( + call.args[0] == {"type": "backup/end"} + for call in ha_ws_client.async_send_command.call_args_list + ) diff --git a/tests/backups/test_manager.py b/tests/backups/test_manager.py index 3d3449e9e..d38e8a465 100644 --- a/tests/backups/test_manager.py +++ b/tests/backups/test_manager.py @@ -20,7 +20,7 @@ from supervisor.docker.addon import DockerAddon from supervisor.docker.const import ContainerState from supervisor.docker.homeassistant import DockerHomeAssistant from supervisor.docker.monitor import DockerContainerStateEvent -from supervisor.exceptions import AddonsError, DockerError +from supervisor.exceptions import AddonsError, BackupError, DockerError from supervisor.homeassistant.core import HomeAssistantCore from supervisor.homeassistant.module import HomeAssistant from supervisor.mounts.mount import Mount @@ -832,8 +832,7 @@ async def test_restore_with_healthcheck( def _make_backup_message_for_assert( *, - full: bool = True, - restore: bool = False, + action: str = "full_backup", reference: str, stage: str | None, done: bool = False, @@ -844,7 +843,7 @@ def _make_backup_message_for_assert( "data": { "event": "job", "data": { - "name": f"backup_manager_{'full' if full else 'partial'}_{'restore' if restore else 'backup'}", + "name": f"backup_manager_{action}", "reference": reference, "uuid": ANY, "progress": 0, @@ -920,27 +919,35 @@ async def test_backup_progress( == "backup_manager_partial_backup" ] assert messages == [ - _make_backup_message_for_assert(full=False, reference=None, stage=None), _make_backup_message_for_assert( - full=False, reference=partial_backup.slug, stage=None + action="partial_backup", reference=None, stage=None ), _make_backup_message_for_assert( - full=False, reference=partial_backup.slug, stage="addon_repositories" + action="partial_backup", reference=partial_backup.slug, stage=None ), _make_backup_message_for_assert( - full=False, reference=partial_backup.slug, stage="docker_config" + action="partial_backup", + reference=partial_backup.slug, + stage="addon_repositories", ), _make_backup_message_for_assert( - full=False, reference=partial_backup.slug, stage="addons" + action="partial_backup", + reference=partial_backup.slug, + stage="docker_config", ), _make_backup_message_for_assert( - full=False, reference=partial_backup.slug, stage="folders" + action="partial_backup", reference=partial_backup.slug, stage="addons" ), _make_backup_message_for_assert( - full=False, reference=partial_backup.slug, stage="finishing_file" + action="partial_backup", reference=partial_backup.slug, stage="folders" ), _make_backup_message_for_assert( - full=False, + action="partial_backup", + reference=partial_backup.slug, + stage="finishing_file", + ), + _make_backup_message_for_assert( + action="partial_backup", reference=partial_backup.slug, stage="finishing_file", done=True, @@ -987,51 +994,53 @@ async def test_restore_progress( == "backup_manager_full_restore" ] assert messages == [ - _make_backup_message_for_assert(restore=True, reference=None, stage=None), _make_backup_message_for_assert( - restore=True, reference=full_backup.slug, stage=None + action="full_restore", reference=None, stage=None ), _make_backup_message_for_assert( - restore=True, reference=full_backup.slug, stage="docker_config" + action="full_restore", reference=full_backup.slug, stage=None ), _make_backup_message_for_assert( - restore=True, reference=full_backup.slug, stage="folders" + action="full_restore", reference=full_backup.slug, stage="docker_config" ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="folders" + ), + _make_backup_message_for_assert( + action="full_restore", reference=full_backup.slug, stage="home_assistant", ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="remove_delta_addons", ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="addon_repositories", ), _make_backup_message_for_assert( - restore=True, reference=full_backup.slug, stage="addons" + action="full_restore", reference=full_backup.slug, stage="addons" ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="await_home_assistant_restart", ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="await_addon_restarts", ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="check_home_assistant", ), _make_backup_message_for_assert( - restore=True, + action="full_restore", reference=full_backup.slug, stage="check_home_assistant", done=True, @@ -1055,29 +1064,25 @@ async def test_restore_progress( ] assert messages == [ _make_backup_message_for_assert( - full=False, restore=True, reference=None, stage=None + action="partial_restore", reference=None, stage=None ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=folders_backup.slug, stage=None, ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=folders_backup.slug, stage="docker_config", ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=folders_backup.slug, stage="folders", ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=folders_backup.slug, stage="folders", done=True, @@ -1103,37 +1108,151 @@ async def test_restore_progress( ] assert messages == [ _make_backup_message_for_assert( - full=False, restore=True, reference=None, stage=None + action="partial_restore", reference=None, stage=None ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=addon_backup.slug, stage=None, ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=addon_backup.slug, stage="docker_config", ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=addon_backup.slug, stage="addon_repositories", ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=addon_backup.slug, stage="addons", ), _make_backup_message_for_assert( - full=False, - restore=True, + action="partial_restore", reference=addon_backup.slug, stage="addons", done=True, ), ] + + +async def test_freeze_thaw( + coresys: CoreSys, + install_addon_ssh: Addon, + container: MagicMock, + ha_ws_client: AsyncMock, + tmp_supervisor_data, + path_extern, +): + """Test manual freeze and thaw for external snapshots.""" + container.status = "running" + install_addon_ssh.path_data.mkdir() + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 5000 + container.exec_run.return_value = (0, None) + ha_ws_client.ha_version = AwesomeVersion("2022.1.0") + + with patch.object( + AddonModel, "backup_pre", new=PropertyMock(return_value="pre_backup") + ), patch.object( + AddonModel, "backup_post", new=PropertyMock(return_value="post_backup") + ): + # Run the freeze + await coresys.backups.freeze_all() + container.exec_run.assert_called_once_with("pre_backup") + assert coresys.core.state == CoreState.FREEZE + + await asyncio.sleep(0) + messages = [ + call.args[0] + for call in ha_ws_client.async_send_command.call_args_list + if call.args[0]["type"] in ["backup/start", "backup/end"] + or call.args[0]["data"].get("data", {}).get("name") + in ["backup_manager_freeze_all", "backup_manager_thaw_all"] + ] + assert messages == [ + _make_backup_message_for_assert( + action="freeze_all", reference=None, stage=None + ), + {"type": "backup/start"}, + _make_backup_message_for_assert( + action="freeze_all", reference=None, stage="home_assistant" + ), + _make_backup_message_for_assert( + action="freeze_all", reference=None, stage="addons" + ), + _make_backup_message_for_assert( + action="thaw_all", reference=None, stage=None + ), + _make_backup_message_for_assert( + action="freeze_all", reference=None, stage="addons", done=True + ), + ] + + # Release the thaw task + container.exec_run.reset_mock() + ha_ws_client.async_send_command.reset_mock() + await coresys.backups.thaw_all() + container.exec_run.assert_called_once_with("post_backup") + assert coresys.core.state == CoreState.RUNNING + + await asyncio.sleep(0) + messages = [ + call.args[0] + for call in ha_ws_client.async_send_command.call_args_list + if call.args[0]["type"] in ["backup/start", "backup/end"] + or call.args[0]["data"].get("data", {}).get("name") + in ["backup_manager_freeze_all", "backup_manager_thaw_all"] + ] + assert messages == [ + {"type": "backup/end"}, + _make_backup_message_for_assert( + action="thaw_all", reference=None, stage="home_assistant" + ), + _make_backup_message_for_assert( + action="thaw_all", reference=None, stage="addons" + ), + _make_backup_message_for_assert( + action="thaw_all", reference=None, stage="addons", done=True + ), + ] + + +async def test_freeze_thaw_timeout( + coresys: CoreSys, + ha_ws_client: AsyncMock, + caplog: pytest.LogCaptureFixture, + tmp_supervisor_data, + path_extern, +): + """Test manual freeze ends due to timeout expiration.""" + coresys.core.state = CoreState.RUNNING + coresys.hardware.disk.get_disk_free_space = lambda x: 5000 + ha_ws_client.ha_version = AwesomeVersion("2022.1.0") + + await coresys.backups.freeze_all(timeout=0.01) + assert coresys.core.state == CoreState.FREEZE + await asyncio.sleep(0) + assert any( + call.args[0] == {"type": "backup/start"} + for call in ha_ws_client.async_send_command.call_args_list + ) + + ha_ws_client.async_send_command.reset_mock() + await asyncio.sleep(0.02) + assert coresys.core.state == CoreState.RUNNING + assert any( + call.args[0] == {"type": "backup/end"} + for call in ha_ws_client.async_send_command.call_args_list + ) + assert "Timeout waiting for signal to thaw after manual freeze" in caplog.text + + +async def test_cannot_manually_thaw_normal_freeze(coresys: CoreSys): + """Test thaw_all cannot be used unless freeze was started by freeze_all method.""" + coresys.core.state = CoreState.FREEZE + with pytest.raises(BackupError): + await coresys.backups.thaw_all() diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index 3ea552e31..ffa5e73d0 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -990,3 +990,57 @@ async def test_internal_jobs_no_notify(coresys: CoreSys): }, } ) + + +async def test_job_starting_separate_task(coresys: CoreSys): + """Test job that starts a job as a separate asyncio task.""" + + class TestClass(JobGroup): + """Test class.""" + + def __init__(self, coresys: CoreSys) -> None: + super().__init__(coresys, "test_class_locking") + self.event = asyncio.Event() + + @Job( + name="test_job_starting_separate_task_job_task", + limit=JobExecutionLimit.GROUP_ONCE, + ) + async def job_task(self): + """Create a separate long running job task.""" + self.sys_jobs.current.stage = "launch_task" + return self.sys_create_task(self.job_task_inner()) + + @Job(name="test_job_starting_separate_task_job_task_inner") + async def job_task_inner(self): + """Check & update job and wait for release.""" + assert self.sys_jobs.current.parent_id is None + self.sys_jobs.current.stage = "start" + await self.event.wait() + self.sys_jobs.current.stage = "end" + + @Job(name="test_job_starting_separate_task_release") + async def job_release(self): + """Release inner task.""" + self.event.set() + + @Job( + name="test_job_starting_separate_task_job_await", + limit=JobExecutionLimit.GROUP_ONCE, + ) + async def job_await(self): + """Await a simple job in same group to confirm lock released.""" + await self.job_await_inner() + + @Job(name="test_job_starting_separate_task_job_await_inner") + async def job_await_inner(self): + """Confirm there is a parent this way.""" + assert self.sys_jobs.current.parent_id is not None + + test = TestClass(coresys) + + task = await test.job_task() + await asyncio.sleep(0) + await test.job_await() + await test.job_release() + await task