From 606db3585cb07a7e04a309fbfd540403da1a4e86 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 18 Feb 2025 20:59:09 +0100 Subject: [PATCH] Remove I/O in event loop for backup create and restore operations (#5634) * Remove I/O from backup create() function * Move mount check into exectutor thread * Remove I/O from backup open() function * Remove I/O from _folder_save() * Refactor remove_folder and remove_folder_with_excludes Make remove_folder and remove_folder_with_excludes synchronous functions which need to be run in an executor thread to be safely used in asyncio. This makes them better composable with other I/O operations like checking for file existence etc. * Fix logger typo * Use return values for functions running in an exectutor * Move location check into a separate function * Fix extract --- supervisor/backups/backup.py | 123 ++++++++------ supervisor/backups/manager.py | 36 +++- supervisor/homeassistant/module.py | 156 +++++++++--------- .../resolution/fixups/store_execute_reset.py | 2 +- supervisor/store/git.py | 10 +- supervisor/utils/__init__.py | 69 ++++---- tests/utils/test_remove_folder.py | 13 +- 7 files changed, 219 insertions(+), 190 deletions(-) diff --git a/supervisor/backups/backup.py b/supervisor/backups/backup.py index 49d67a845..c1ba7b4fc 100644 --- a/supervisor/backups/backup.py +++ b/supervisor/backups/backup.py @@ -94,7 +94,7 @@ class Backup(JobGroup): coresys, JOB_GROUP_BACKUP.format_map(defaultdict(str, slug=slug)), slug ) self._data: dict[str, Any] = data or {ATTR_SLUG: slug} - self._tmp = None + self._tmp: TemporaryDirectory = None self._outer_secure_tarfile: SecureTarFile | None = None self._key: bytes | None = None self._aes: Cipher | None = None @@ -463,23 +463,31 @@ class Backup(JobGroup): @asynccontextmanager async def create(self) -> AsyncGenerator[None]: """Create new backup file.""" - if self.tarfile.is_file(): - raise BackupError( - f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!", - _LOGGER.error, - ) - self._outer_secure_tarfile = SecureTarFile( - self.tarfile, - "w", - gzip=False, - bufsize=BUF_SIZE, + def _open_outer_tarfile(): + """Create and open outer tarfile.""" + if self.tarfile.is_file(): + raise BackupError( + f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!", + _LOGGER.error, + ) + + outer_secure_tarfile = SecureTarFile( + self.tarfile, + "w", + gzip=False, + bufsize=BUF_SIZE, + ) + return outer_secure_tarfile, outer_secure_tarfile.open() + + self._outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor( + _open_outer_tarfile ) try: - with self._outer_secure_tarfile as outer_tarfile: - yield - await self._create_cleanup(outer_tarfile) + yield finally: + await self._create_cleanup(outer_tarfile) + await self.sys_run_in_executor(self._outer_secure_tarfile.close) self._outer_secure_tarfile = None @asynccontextmanager @@ -496,28 +504,34 @@ class Backup(JobGroup): if location == DEFAULT else self.all_locations[location][ATTR_PATH] ) - if not backup_tarfile.is_file(): - self.sys_create_task(self.sys_backups.reload(location)) - raise BackupFileNotFoundError( - f"Cannot open backup at {backup_tarfile.as_posix()}, file does not exist!", - _LOGGER.error, - ) # extract an existing backup - self._tmp = TemporaryDirectory(dir=str(backup_tarfile.parent)) - def _extract_backup(): - """Extract a backup.""" + if not backup_tarfile.is_file(): + raise BackupFileNotFoundError( + f"Cannot open backup at {backup_tarfile.as_posix()}, file does not exist!", + _LOGGER.error, + ) + tmp = TemporaryDirectory(dir=str(backup_tarfile.parent)) + with tarfile.open(backup_tarfile, "r:") as tar: tar.extractall( - path=self._tmp.name, + path=tmp.name, members=secure_path(tar), filter="fully_trusted", ) - with self._tmp: - await self.sys_run_in_executor(_extract_backup) + return tmp + + try: + self._tmp = await self.sys_run_in_executor(_extract_backup) yield + except BackupFileNotFoundError as err: + self.sys_create_task(self.sys_backups.reload(location)) + raise err + finally: + if self._tmp: + self._tmp.cleanup() async def _create_cleanup(self, outer_tarfile: TarFile) -> None: """Cleanup after backup creation. @@ -669,17 +683,16 @@ class Backup(JobGroup): async def _folder_save(self, name: str): """Take backup of a folder.""" self.sys_jobs.current.reference = name - slug_name = name.replace("/", "_") tar_name = f"{slug_name}.tar{'.gz' if self.compressed else ''}" origin_dir = Path(self.sys_config.path_supervisor, name) - # Check if exists - if not origin_dir.is_dir(): - _LOGGER.warning("Can't find backup folder %s", name) - return + def _save() -> bool: + # Check if exists + if not origin_dir.is_dir(): + _LOGGER.warning("Can't find backup folder %s", name) + return False - def _save() -> None: # Take backup _LOGGER.info("Backing up folder %s", name) @@ -712,16 +725,16 @@ class Backup(JobGroup): ) _LOGGER.info("Backup folder %s done", name) + return True try: - await self.sys_run_in_executor(_save) + if await self.sys_run_in_executor(_save): + self._data[ATTR_FOLDERS].append(name) except (tarfile.TarError, OSError) as err: raise BackupError( f"Can't backup folder {name}: {str(err)}", _LOGGER.error ) from err - self._data[ATTR_FOLDERS].append(name) - @Job(name="backup_store_folders", cleanup=False) async def store_folders(self, folder_list: list[str]): """Backup Supervisor data into backup.""" @@ -740,28 +753,18 @@ class Backup(JobGroup): ) origin_dir = Path(self.sys_config.path_supervisor, name) - # Check if exists inside backup - if not tar_name.exists(): - raise BackupInvalidError( - f"Can't find restore folder {name}", _LOGGER.warning - ) - - # Unmount any mounts within folder - bind_mounts = [ - bound.bind_mount - for bound in self.sys_mounts.bound_mounts - if bound.bind_mount.local_where - and bound.bind_mount.local_where.is_relative_to(origin_dir) - ] - if bind_mounts: - await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts]) - - # Clean old stuff - if origin_dir.is_dir(): - await remove_folder(origin_dir, content_only=True) - # Perform a restore def _restore() -> bool: + # Check if exists inside backup + if not tar_name.exists(): + raise BackupInvalidError( + f"Can't find restore folder {name}", _LOGGER.warning + ) + + # Clean old stuff + if origin_dir.is_dir(): + remove_folder(origin_dir, content_only=True) + try: _LOGGER.info("Restore folder %s", name) with SecureTarFile( @@ -781,6 +784,16 @@ class Backup(JobGroup): ) from err return True + # Unmount any mounts within folder + bind_mounts = [ + bound.bind_mount + for bound in self.sys_mounts.bound_mounts + if bound.bind_mount.local_where + and bound.bind_mount.local_where.is_relative_to(origin_dir) + ] + if bind_mounts: + await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts]) + try: return await self.sys_run_in_executor(_restore) finally: diff --git a/supervisor/backups/manager.py b/supervisor/backups/manager.py index 14f45dcc2..4fe110324 100644 --- a/supervisor/backups/manager.py +++ b/supervisor/backups/manager.py @@ -118,14 +118,24 @@ class BackupManager(FileConfiguration, JobGroup): location = self.sys_mounts.default_backup_mount if location: - if not location.local_where.is_mount(): - raise BackupMountDownError( - f"{location.name} is down, cannot back-up to it", _LOGGER.error - ) - return location.local_where + location_mount: Mount = location + return location_mount.local_where return self.sys_config.path_backup + async def _check_location(self, location: LOCATION_TYPE | type[DEFAULT] = DEFAULT): + """Check if backup location is accessible.""" + if location == DEFAULT and self.sys_mounts.default_backup_mount: + location = self.sys_mounts.default_backup_mount + + if location not in (DEFAULT, LOCATION_CLOUD_BACKUP, None): + location_mount: Mount = location + if not await location_mount.is_mounted(): + raise BackupMountDownError( + f"{location_mount.name} is down, cannot back-up to it", + _LOGGER.error, + ) + def _get_location_name( self, location: LOCATION_TYPE | type[DEFAULT] = DEFAULT, @@ -352,8 +362,14 @@ class BackupManager(FileConfiguration, JobGroup): copy(backup.tarfile, self.sys_config.path_core_backup) ) elif location: - all_locations[location.name] = Path( - copy(backup.tarfile, location.local_where) + location_mount: Mount = location + if not location_mount.local_where.is_mount(): + raise BackupMountDownError( + f"{location_mount.name} is down, cannot copy to it", + _LOGGER.error, + ) + all_locations[location_mount.name] = Path( + copy(backup.tarfile, location_mount.local_where) ) else: all_locations[None] = Path( @@ -395,6 +411,8 @@ class BackupManager(FileConfiguration, JobGroup): additional_locations: list[LOCATION_TYPE] | None = None, ) -> Backup | None: """Check backup tarfile and import it.""" + await self._check_location(location) + backup = Backup(self.coresys, tar_file, "temp", None) # Read meta data @@ -542,6 +560,8 @@ class BackupManager(FileConfiguration, JobGroup): additional_locations: list[LOCATION_TYPE] | None = None, ) -> Backup | None: """Create a full backup.""" + await self._check_location(location) + if self._get_base_path(location) in { self.sys_config.path_backup, self.sys_config.path_core_backup, @@ -590,6 +610,8 @@ class BackupManager(FileConfiguration, JobGroup): additional_locations: list[LOCATION_TYPE] | None = None, ) -> Backup | None: """Create a partial backup.""" + await self._check_location(location) + if self._get_base_path(location) in { self.sys_config.path_backup, self.sys_config.path_core_backup, diff --git a/supervisor/homeassistant/module.py b/supervisor/homeassistant/module.py index d652b3661..304ca8b46 100644 --- a/supervisor/homeassistant/module.py +++ b/supervisor/homeassistant/module.py @@ -9,6 +9,7 @@ from pathlib import Path, PurePath import shutil import tarfile from tempfile import TemporaryDirectory +from typing import Any from uuid import UUID from awesomeversion import AwesomeVersion, AwesomeVersionException @@ -46,7 +47,7 @@ from ..hardware.const import PolicyGroup from ..hardware.data import Device from ..jobs.decorator import Job, JobExecutionLimit from ..resolution.const import UnhealthyReason -from ..utils import remove_folder +from ..utils import remove_folder, remove_folder_with_excludes from ..utils.common import FileConfiguration from ..utils.json import read_json_file, write_json_file from .api import HomeAssistantAPI @@ -457,91 +458,94 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes): self, tar_file: tarfile.TarFile, exclude_database: bool = False ) -> None: """Restore Home Assistant Core config/ directory.""" - with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: - temp_path = Path(temp) - temp_data = temp_path.joinpath("data") - temp_meta = temp_path.joinpath("homeassistant.json") - # extract backup - def _extract_tarfile(): - """Extract tar backup.""" - with tar_file as backup: - backup.extractall( - path=temp_path, - members=secure_path(backup), - filter="fully_trusted", + def _restore_home_assistant() -> Any: + """Restores data and reads metadata from backup. + + Returns: Home Assistant metdata + """ + with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: + temp_path = Path(temp) + temp_data = temp_path.joinpath("data") + temp_meta = temp_path.joinpath("homeassistant.json") + + # extract backup + try: + with tar_file as backup: + backup.extractall( + path=temp_path, + members=secure_path(backup), + filter="fully_trusted", + ) + except tarfile.TarError as err: + raise HomeAssistantError( + f"Can't read tarfile {tar_file}: {err}", _LOGGER.error + ) from err + + # Check old backup format v1 + if not temp_data.exists(): + temp_data = temp_path + + _LOGGER.info("Restore Home Assistant Core config folder") + if exclude_database: + remove_folder_with_excludes( + self.sys_config.path_homeassistant, + excludes=HOMEASSISTANT_BACKUP_EXCLUDE_DATABASE, + tmp_dir=self.sys_config.path_tmp, ) + else: + remove_folder(self.sys_config.path_homeassistant) - try: - await self.sys_run_in_executor(_extract_tarfile) - except tarfile.TarError as err: - raise HomeAssistantError( - f"Can't read tarfile {tar_file}: {err}", _LOGGER.error - ) from err + try: + shutil.copytree( + temp_data, + self.sys_config.path_homeassistant, + symlinks=True, + dirs_exist_ok=True, + ) + except shutil.Error as err: + raise HomeAssistantError( + f"Can't restore origin data: {err}", _LOGGER.error + ) from err - # Check old backup format v1 - if not temp_data.exists(): - temp_data = temp_path + _LOGGER.info("Restore Home Assistant Core config folder done") - # Restore data - def _restore_data(): - """Restore data.""" - shutil.copytree( - temp_data, - self.sys_config.path_homeassistant, - symlinks=True, - dirs_exist_ok=True, - ) + if not temp_meta.exists(): + return None + _LOGGER.info("Restore Home Assistant Core metadata") - _LOGGER.info("Restore Home Assistant Core config folder") - excludes = ( - HOMEASSISTANT_BACKUP_EXCLUDE_DATABASE if exclude_database else None - ) - await remove_folder( - self.sys_config.path_homeassistant, - content_only=True, - excludes=excludes, - tmp_dir=self.sys_config.path_tmp, - ) - try: - await self.sys_run_in_executor(_restore_data) - except shutil.Error as err: - raise HomeAssistantError( - f"Can't restore origin data: {err}", _LOGGER.error - ) from err + # Read backup data + try: + data = read_json_file(temp_meta) + except ConfigurationFileError as err: + raise HomeAssistantError() from err - _LOGGER.info("Restore Home Assistant Core config folder done") + return data - if not temp_meta.exists(): - return - _LOGGER.info("Restore Home Assistant Core metadata") + data = await self.sys_run_in_executor(_restore_home_assistant) + if data is None: + return - # Read backup data - try: - data = read_json_file(temp_meta) - except ConfigurationFileError as err: - raise HomeAssistantError() from err + # Validate metadata + try: + data = SCHEMA_HASS_CONFIG(data) + except vol.Invalid as err: + raise HomeAssistantError( + f"Can't validate backup data: {humanize_error(data, err)}", + _LOGGER.error, + ) from err - # Validate - try: - data = SCHEMA_HASS_CONFIG(data) - except vol.Invalid as err: - raise HomeAssistantError( - f"Can't validate backup data: {humanize_error(data, err)}", - _LOGGER.err, - ) from err - - # Restore metadata - for attr in ( - ATTR_AUDIO_INPUT, - ATTR_AUDIO_OUTPUT, - ATTR_PORT, - ATTR_SSL, - ATTR_REFRESH_TOKEN, - ATTR_WATCHDOG, - ): - if attr in data: - self._data[attr] = data[attr] + # Restore metadata + for attr in ( + ATTR_AUDIO_INPUT, + ATTR_AUDIO_OUTPUT, + ATTR_PORT, + ATTR_SSL, + ATTR_REFRESH_TOKEN, + ATTR_WATCHDOG, + ): + if attr in data: + self._data[attr] = data[attr] @Job( name="home_assistant_get_users", diff --git a/supervisor/resolution/fixups/store_execute_reset.py b/supervisor/resolution/fixups/store_execute_reset.py index 2d3b3d826..45bf15c7f 100644 --- a/supervisor/resolution/fixups/store_execute_reset.py +++ b/supervisor/resolution/fixups/store_execute_reset.py @@ -40,7 +40,7 @@ class FixupStoreExecuteReset(FixupBase): _LOGGER.warning("Can't find store %s for fixup", reference) return - await remove_folder(repository.git.path) + await self.sys_run_in_executor(remove_folder, repository.git.path) # Load data again try: diff --git a/supervisor/store/git.py b/supervisor/store/git.py index 69c637814..1cf3b0551 100644 --- a/supervisor/store/git.py +++ b/supervisor/store/git.py @@ -189,9 +189,13 @@ class GitRepo(CoreSysAttributes): _LOGGER.warning("There is already a task in progress") return - if not self.path.is_dir(): - return - await remove_folder(self.path) + def _remove_git_dir(path: Path) -> None: + if not path.is_dir(): + return + remove_folder(path) + + async with self.lock: + await self.sys_run_in_executor(_remove_git_dir, self.path) class GitRepoHassIO(GitRepo): diff --git a/supervisor/utils/__init__.py b/supervisor/utils/__init__.py index a0e7c85b7..a9ed34447 100644 --- a/supervisor/utils/__init__.py +++ b/supervisor/utils/__init__.py @@ -8,6 +8,7 @@ import os from pathlib import Path import re import socket +import subprocess from tempfile import TemporaryDirectory from typing import Any @@ -80,11 +81,9 @@ def get_message_from_exception_chain(err: Exception) -> str: return get_message_from_exception_chain(err.__context__) -async def remove_folder( +def remove_folder( folder: Path, content_only: bool = False, - excludes: list[str] | None = None, - tmp_dir: Path | None = None, ) -> None: """Remove folder and reset privileged. @@ -92,48 +91,40 @@ async def remove_folder( - CAP_DAC_OVERRIDE - CAP_DAC_READ_SEARCH """ - if excludes: - if not tmp_dir: - raise ValueError("tmp_dir is required if excludes are provided") - if not content_only: - raise ValueError("Cannot delete the folder if excludes are provided") + find_args = [] + if content_only: + find_args.extend(["-mindepth", "1"]) + try: + proc = subprocess.run( + ["/usr/bin/find", str(folder), "-xdev", *find_args, "-delete"], + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + env=clean_env(), + text=True, + check=True, + ) + if proc.returncode != 0: + _LOGGER.error("Can't remove folder %s: %s", folder, proc.stderr.strip()) + except OSError as err: + _LOGGER.exception("Can't remove folder %s: %s", folder, err) - temp = TemporaryDirectory(dir=tmp_dir) - temp_path = Path(temp.name) + +def remove_folder_with_excludes( + folder: Path, + excludes: list[str], + tmp_dir: Path | None = None, +) -> None: + """Remove folder with excludes.""" + with TemporaryDirectory(dir=tmp_dir) as temp_path: + temp_path = Path(temp_path) moved_files: list[Path] = [] for item in folder.iterdir(): if any(item.match(exclude) for exclude in excludes): moved_files.append(item.rename(temp_path / item.name)) - find_args = [] - if content_only: - find_args.extend(["-mindepth", "1"]) - try: - proc = await asyncio.create_subprocess_exec( - "/usr/bin/find", - folder, - "-xdev", - *find_args, - "-delete", - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.PIPE, - env=clean_env(), - ) - - _, error_msg = await proc.communicate() - except OSError as err: - _LOGGER.exception("Can't remove folder %s: %s", folder, err) - else: - if proc.returncode == 0: - return - _LOGGER.error( - "Can't remove folder %s: %s", folder, error_msg.decode("utf-8").strip() - ) - finally: - if excludes: - for item in moved_files: - item.rename(folder / item.name) - temp.cleanup() + remove_folder(folder, content_only=True) + for item in moved_files: + item.rename(folder / item.name) def clean_env() -> dict[str, str]: diff --git a/tests/utils/test_remove_folder.py b/tests/utils/test_remove_folder.py index 733baa3b3..22329bfce 100644 --- a/tests/utils/test_remove_folder.py +++ b/tests/utils/test_remove_folder.py @@ -3,13 +3,10 @@ from pathlib import Path import shutil -import pytest - from supervisor.utils import remove_folder -@pytest.mark.asyncio -async def test_remove_all(tmp_path): +def test_remove_all(tmp_path): """Test remove folder.""" # Prepair test folder temp_orig = tmp_path.joinpath("orig") @@ -17,12 +14,11 @@ async def test_remove_all(tmp_path): shutil.copytree(fixture_data, temp_orig, symlinks=True) assert temp_orig.exists() - await remove_folder(temp_orig) + remove_folder(temp_orig) assert not temp_orig.exists() -@pytest.mark.asyncio -async def test_remove_content(tmp_path): +def test_remove_content(tmp_path): """Test remove content of folder.""" # Prepair test folder temp_orig = tmp_path.joinpath("orig") @@ -38,8 +34,7 @@ async def test_remove_content(tmp_path): assert test_folder.exists() assert test_file.exists() assert test_hidden.exists() - - await remove_folder(temp_orig, content_only=True) + remove_folder(temp_orig, content_only=True) assert not test_folder.exists() assert not test_file.exists()