Remove I/O in event loop for backup create and restore operations (#5634)

* Remove I/O from backup create() function

* Move mount check into exectutor thread

* Remove I/O from backup open() function

* Remove I/O from _folder_save()

* Refactor remove_folder and remove_folder_with_excludes

Make remove_folder and remove_folder_with_excludes synchronous
functions which need to be run in an executor thread to be safely used
in asyncio. This makes them better composable with other I/O operations
like checking for file existence etc.

* Fix logger typo

* Use return values for functions running in an exectutor

* Move location check into a separate function

* Fix extract
This commit is contained in:
Stefan Agner 2025-02-18 20:59:09 +01:00 committed by GitHub
parent 4054749eb2
commit 606db3585c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 219 additions and 190 deletions

View File

@ -94,7 +94,7 @@ class Backup(JobGroup):
coresys, JOB_GROUP_BACKUP.format_map(defaultdict(str, slug=slug)), slug coresys, JOB_GROUP_BACKUP.format_map(defaultdict(str, slug=slug)), slug
) )
self._data: dict[str, Any] = data or {ATTR_SLUG: slug} self._data: dict[str, Any] = data or {ATTR_SLUG: slug}
self._tmp = None self._tmp: TemporaryDirectory = None
self._outer_secure_tarfile: SecureTarFile | None = None self._outer_secure_tarfile: SecureTarFile | None = None
self._key: bytes | None = None self._key: bytes | None = None
self._aes: Cipher | None = None self._aes: Cipher | None = None
@ -463,23 +463,31 @@ class Backup(JobGroup):
@asynccontextmanager @asynccontextmanager
async def create(self) -> AsyncGenerator[None]: async def create(self) -> AsyncGenerator[None]:
"""Create new backup file.""" """Create new backup file."""
if self.tarfile.is_file():
raise BackupError(
f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!",
_LOGGER.error,
)
self._outer_secure_tarfile = SecureTarFile( def _open_outer_tarfile():
self.tarfile, """Create and open outer tarfile."""
"w", if self.tarfile.is_file():
gzip=False, raise BackupError(
bufsize=BUF_SIZE, f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!",
_LOGGER.error,
)
outer_secure_tarfile = SecureTarFile(
self.tarfile,
"w",
gzip=False,
bufsize=BUF_SIZE,
)
return outer_secure_tarfile, outer_secure_tarfile.open()
self._outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
_open_outer_tarfile
) )
try: try:
with self._outer_secure_tarfile as outer_tarfile: yield
yield
await self._create_cleanup(outer_tarfile)
finally: finally:
await self._create_cleanup(outer_tarfile)
await self.sys_run_in_executor(self._outer_secure_tarfile.close)
self._outer_secure_tarfile = None self._outer_secure_tarfile = None
@asynccontextmanager @asynccontextmanager
@ -496,28 +504,34 @@ class Backup(JobGroup):
if location == DEFAULT if location == DEFAULT
else self.all_locations[location][ATTR_PATH] else self.all_locations[location][ATTR_PATH]
) )
if not backup_tarfile.is_file():
self.sys_create_task(self.sys_backups.reload(location))
raise BackupFileNotFoundError(
f"Cannot open backup at {backup_tarfile.as_posix()}, file does not exist!",
_LOGGER.error,
)
# extract an existing backup # extract an existing backup
self._tmp = TemporaryDirectory(dir=str(backup_tarfile.parent))
def _extract_backup(): def _extract_backup():
"""Extract a backup.""" if not backup_tarfile.is_file():
raise BackupFileNotFoundError(
f"Cannot open backup at {backup_tarfile.as_posix()}, file does not exist!",
_LOGGER.error,
)
tmp = TemporaryDirectory(dir=str(backup_tarfile.parent))
with tarfile.open(backup_tarfile, "r:") as tar: with tarfile.open(backup_tarfile, "r:") as tar:
tar.extractall( tar.extractall(
path=self._tmp.name, path=tmp.name,
members=secure_path(tar), members=secure_path(tar),
filter="fully_trusted", filter="fully_trusted",
) )
with self._tmp: return tmp
await self.sys_run_in_executor(_extract_backup)
try:
self._tmp = await self.sys_run_in_executor(_extract_backup)
yield yield
except BackupFileNotFoundError as err:
self.sys_create_task(self.sys_backups.reload(location))
raise err
finally:
if self._tmp:
self._tmp.cleanup()
async def _create_cleanup(self, outer_tarfile: TarFile) -> None: async def _create_cleanup(self, outer_tarfile: TarFile) -> None:
"""Cleanup after backup creation. """Cleanup after backup creation.
@ -669,17 +683,16 @@ class Backup(JobGroup):
async def _folder_save(self, name: str): async def _folder_save(self, name: str):
"""Take backup of a folder.""" """Take backup of a folder."""
self.sys_jobs.current.reference = name self.sys_jobs.current.reference = name
slug_name = name.replace("/", "_") slug_name = name.replace("/", "_")
tar_name = f"{slug_name}.tar{'.gz' if self.compressed else ''}" tar_name = f"{slug_name}.tar{'.gz' if self.compressed else ''}"
origin_dir = Path(self.sys_config.path_supervisor, name) origin_dir = Path(self.sys_config.path_supervisor, name)
# Check if exists def _save() -> bool:
if not origin_dir.is_dir(): # Check if exists
_LOGGER.warning("Can't find backup folder %s", name) if not origin_dir.is_dir():
return _LOGGER.warning("Can't find backup folder %s", name)
return False
def _save() -> None:
# Take backup # Take backup
_LOGGER.info("Backing up folder %s", name) _LOGGER.info("Backing up folder %s", name)
@ -712,16 +725,16 @@ class Backup(JobGroup):
) )
_LOGGER.info("Backup folder %s done", name) _LOGGER.info("Backup folder %s done", name)
return True
try: try:
await self.sys_run_in_executor(_save) if await self.sys_run_in_executor(_save):
self._data[ATTR_FOLDERS].append(name)
except (tarfile.TarError, OSError) as err: except (tarfile.TarError, OSError) as err:
raise BackupError( raise BackupError(
f"Can't backup folder {name}: {str(err)}", _LOGGER.error f"Can't backup folder {name}: {str(err)}", _LOGGER.error
) from err ) from err
self._data[ATTR_FOLDERS].append(name)
@Job(name="backup_store_folders", cleanup=False) @Job(name="backup_store_folders", cleanup=False)
async def store_folders(self, folder_list: list[str]): async def store_folders(self, folder_list: list[str]):
"""Backup Supervisor data into backup.""" """Backup Supervisor data into backup."""
@ -740,28 +753,18 @@ class Backup(JobGroup):
) )
origin_dir = Path(self.sys_config.path_supervisor, name) origin_dir = Path(self.sys_config.path_supervisor, name)
# Check if exists inside backup
if not tar_name.exists():
raise BackupInvalidError(
f"Can't find restore folder {name}", _LOGGER.warning
)
# Unmount any mounts within folder
bind_mounts = [
bound.bind_mount
for bound in self.sys_mounts.bound_mounts
if bound.bind_mount.local_where
and bound.bind_mount.local_where.is_relative_to(origin_dir)
]
if bind_mounts:
await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts])
# Clean old stuff
if origin_dir.is_dir():
await remove_folder(origin_dir, content_only=True)
# Perform a restore # Perform a restore
def _restore() -> bool: def _restore() -> bool:
# Check if exists inside backup
if not tar_name.exists():
raise BackupInvalidError(
f"Can't find restore folder {name}", _LOGGER.warning
)
# Clean old stuff
if origin_dir.is_dir():
remove_folder(origin_dir, content_only=True)
try: try:
_LOGGER.info("Restore folder %s", name) _LOGGER.info("Restore folder %s", name)
with SecureTarFile( with SecureTarFile(
@ -781,6 +784,16 @@ class Backup(JobGroup):
) from err ) from err
return True return True
# Unmount any mounts within folder
bind_mounts = [
bound.bind_mount
for bound in self.sys_mounts.bound_mounts
if bound.bind_mount.local_where
and bound.bind_mount.local_where.is_relative_to(origin_dir)
]
if bind_mounts:
await asyncio.gather(*[bind_mount.unmount() for bind_mount in bind_mounts])
try: try:
return await self.sys_run_in_executor(_restore) return await self.sys_run_in_executor(_restore)
finally: finally:

View File

@ -118,14 +118,24 @@ class BackupManager(FileConfiguration, JobGroup):
location = self.sys_mounts.default_backup_mount location = self.sys_mounts.default_backup_mount
if location: if location:
if not location.local_where.is_mount(): location_mount: Mount = location
raise BackupMountDownError( return location_mount.local_where
f"{location.name} is down, cannot back-up to it", _LOGGER.error
)
return location.local_where
return self.sys_config.path_backup return self.sys_config.path_backup
async def _check_location(self, location: LOCATION_TYPE | type[DEFAULT] = DEFAULT):
"""Check if backup location is accessible."""
if location == DEFAULT and self.sys_mounts.default_backup_mount:
location = self.sys_mounts.default_backup_mount
if location not in (DEFAULT, LOCATION_CLOUD_BACKUP, None):
location_mount: Mount = location
if not await location_mount.is_mounted():
raise BackupMountDownError(
f"{location_mount.name} is down, cannot back-up to it",
_LOGGER.error,
)
def _get_location_name( def _get_location_name(
self, self,
location: LOCATION_TYPE | type[DEFAULT] = DEFAULT, location: LOCATION_TYPE | type[DEFAULT] = DEFAULT,
@ -352,8 +362,14 @@ class BackupManager(FileConfiguration, JobGroup):
copy(backup.tarfile, self.sys_config.path_core_backup) copy(backup.tarfile, self.sys_config.path_core_backup)
) )
elif location: elif location:
all_locations[location.name] = Path( location_mount: Mount = location
copy(backup.tarfile, location.local_where) if not location_mount.local_where.is_mount():
raise BackupMountDownError(
f"{location_mount.name} is down, cannot copy to it",
_LOGGER.error,
)
all_locations[location_mount.name] = Path(
copy(backup.tarfile, location_mount.local_where)
) )
else: else:
all_locations[None] = Path( all_locations[None] = Path(
@ -395,6 +411,8 @@ class BackupManager(FileConfiguration, JobGroup):
additional_locations: list[LOCATION_TYPE] | None = None, additional_locations: list[LOCATION_TYPE] | None = None,
) -> Backup | None: ) -> Backup | None:
"""Check backup tarfile and import it.""" """Check backup tarfile and import it."""
await self._check_location(location)
backup = Backup(self.coresys, tar_file, "temp", None) backup = Backup(self.coresys, tar_file, "temp", None)
# Read meta data # Read meta data
@ -542,6 +560,8 @@ class BackupManager(FileConfiguration, JobGroup):
additional_locations: list[LOCATION_TYPE] | None = None, additional_locations: list[LOCATION_TYPE] | None = None,
) -> Backup | None: ) -> Backup | None:
"""Create a full backup.""" """Create a full backup."""
await self._check_location(location)
if self._get_base_path(location) in { if self._get_base_path(location) in {
self.sys_config.path_backup, self.sys_config.path_backup,
self.sys_config.path_core_backup, self.sys_config.path_core_backup,
@ -590,6 +610,8 @@ class BackupManager(FileConfiguration, JobGroup):
additional_locations: list[LOCATION_TYPE] | None = None, additional_locations: list[LOCATION_TYPE] | None = None,
) -> Backup | None: ) -> Backup | None:
"""Create a partial backup.""" """Create a partial backup."""
await self._check_location(location)
if self._get_base_path(location) in { if self._get_base_path(location) in {
self.sys_config.path_backup, self.sys_config.path_backup,
self.sys_config.path_core_backup, self.sys_config.path_core_backup,

View File

@ -9,6 +9,7 @@ from pathlib import Path, PurePath
import shutil import shutil
import tarfile import tarfile
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any
from uuid import UUID from uuid import UUID
from awesomeversion import AwesomeVersion, AwesomeVersionException from awesomeversion import AwesomeVersion, AwesomeVersionException
@ -46,7 +47,7 @@ from ..hardware.const import PolicyGroup
from ..hardware.data import Device from ..hardware.data import Device
from ..jobs.decorator import Job, JobExecutionLimit from ..jobs.decorator import Job, JobExecutionLimit
from ..resolution.const import UnhealthyReason from ..resolution.const import UnhealthyReason
from ..utils import remove_folder from ..utils import remove_folder, remove_folder_with_excludes
from ..utils.common import FileConfiguration from ..utils.common import FileConfiguration
from ..utils.json import read_json_file, write_json_file from ..utils.json import read_json_file, write_json_file
from .api import HomeAssistantAPI from .api import HomeAssistantAPI
@ -457,91 +458,94 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
self, tar_file: tarfile.TarFile, exclude_database: bool = False self, tar_file: tarfile.TarFile, exclude_database: bool = False
) -> None: ) -> None:
"""Restore Home Assistant Core config/ directory.""" """Restore Home Assistant Core config/ directory."""
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
temp_path = Path(temp)
temp_data = temp_path.joinpath("data")
temp_meta = temp_path.joinpath("homeassistant.json")
# extract backup def _restore_home_assistant() -> Any:
def _extract_tarfile(): """Restores data and reads metadata from backup.
"""Extract tar backup."""
with tar_file as backup: Returns: Home Assistant metdata
backup.extractall( """
path=temp_path, with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
members=secure_path(backup), temp_path = Path(temp)
filter="fully_trusted", temp_data = temp_path.joinpath("data")
temp_meta = temp_path.joinpath("homeassistant.json")
# extract backup
try:
with tar_file as backup:
backup.extractall(
path=temp_path,
members=secure_path(backup),
filter="fully_trusted",
)
except tarfile.TarError as err:
raise HomeAssistantError(
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error
) from err
# Check old backup format v1
if not temp_data.exists():
temp_data = temp_path
_LOGGER.info("Restore Home Assistant Core config folder")
if exclude_database:
remove_folder_with_excludes(
self.sys_config.path_homeassistant,
excludes=HOMEASSISTANT_BACKUP_EXCLUDE_DATABASE,
tmp_dir=self.sys_config.path_tmp,
) )
else:
remove_folder(self.sys_config.path_homeassistant)
try: try:
await self.sys_run_in_executor(_extract_tarfile) shutil.copytree(
except tarfile.TarError as err: temp_data,
raise HomeAssistantError( self.sys_config.path_homeassistant,
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error symlinks=True,
) from err dirs_exist_ok=True,
)
except shutil.Error as err:
raise HomeAssistantError(
f"Can't restore origin data: {err}", _LOGGER.error
) from err
# Check old backup format v1 _LOGGER.info("Restore Home Assistant Core config folder done")
if not temp_data.exists():
temp_data = temp_path
# Restore data if not temp_meta.exists():
def _restore_data(): return None
"""Restore data.""" _LOGGER.info("Restore Home Assistant Core metadata")
shutil.copytree(
temp_data,
self.sys_config.path_homeassistant,
symlinks=True,
dirs_exist_ok=True,
)
_LOGGER.info("Restore Home Assistant Core config folder") # Read backup data
excludes = ( try:
HOMEASSISTANT_BACKUP_EXCLUDE_DATABASE if exclude_database else None data = read_json_file(temp_meta)
) except ConfigurationFileError as err:
await remove_folder( raise HomeAssistantError() from err
self.sys_config.path_homeassistant,
content_only=True,
excludes=excludes,
tmp_dir=self.sys_config.path_tmp,
)
try:
await self.sys_run_in_executor(_restore_data)
except shutil.Error as err:
raise HomeAssistantError(
f"Can't restore origin data: {err}", _LOGGER.error
) from err
_LOGGER.info("Restore Home Assistant Core config folder done") return data
if not temp_meta.exists(): data = await self.sys_run_in_executor(_restore_home_assistant)
return if data is None:
_LOGGER.info("Restore Home Assistant Core metadata") return
# Read backup data # Validate metadata
try: try:
data = read_json_file(temp_meta) data = SCHEMA_HASS_CONFIG(data)
except ConfigurationFileError as err: except vol.Invalid as err:
raise HomeAssistantError() from err raise HomeAssistantError(
f"Can't validate backup data: {humanize_error(data, err)}",
_LOGGER.error,
) from err
# Validate # Restore metadata
try: for attr in (
data = SCHEMA_HASS_CONFIG(data) ATTR_AUDIO_INPUT,
except vol.Invalid as err: ATTR_AUDIO_OUTPUT,
raise HomeAssistantError( ATTR_PORT,
f"Can't validate backup data: {humanize_error(data, err)}", ATTR_SSL,
_LOGGER.err, ATTR_REFRESH_TOKEN,
) from err ATTR_WATCHDOG,
):
# Restore metadata if attr in data:
for attr in ( self._data[attr] = data[attr]
ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT,
ATTR_PORT,
ATTR_SSL,
ATTR_REFRESH_TOKEN,
ATTR_WATCHDOG,
):
if attr in data:
self._data[attr] = data[attr]
@Job( @Job(
name="home_assistant_get_users", name="home_assistant_get_users",

View File

@ -40,7 +40,7 @@ class FixupStoreExecuteReset(FixupBase):
_LOGGER.warning("Can't find store %s for fixup", reference) _LOGGER.warning("Can't find store %s for fixup", reference)
return return
await remove_folder(repository.git.path) await self.sys_run_in_executor(remove_folder, repository.git.path)
# Load data again # Load data again
try: try:

View File

@ -189,9 +189,13 @@ class GitRepo(CoreSysAttributes):
_LOGGER.warning("There is already a task in progress") _LOGGER.warning("There is already a task in progress")
return return
if not self.path.is_dir(): def _remove_git_dir(path: Path) -> None:
return if not path.is_dir():
await remove_folder(self.path) return
remove_folder(path)
async with self.lock:
await self.sys_run_in_executor(_remove_git_dir, self.path)
class GitRepoHassIO(GitRepo): class GitRepoHassIO(GitRepo):

View File

@ -8,6 +8,7 @@ import os
from pathlib import Path from pathlib import Path
import re import re
import socket import socket
import subprocess
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any from typing import Any
@ -80,11 +81,9 @@ def get_message_from_exception_chain(err: Exception) -> str:
return get_message_from_exception_chain(err.__context__) return get_message_from_exception_chain(err.__context__)
async def remove_folder( def remove_folder(
folder: Path, folder: Path,
content_only: bool = False, content_only: bool = False,
excludes: list[str] | None = None,
tmp_dir: Path | None = None,
) -> None: ) -> None:
"""Remove folder and reset privileged. """Remove folder and reset privileged.
@ -92,48 +91,40 @@ async def remove_folder(
- CAP_DAC_OVERRIDE - CAP_DAC_OVERRIDE
- CAP_DAC_READ_SEARCH - CAP_DAC_READ_SEARCH
""" """
if excludes: find_args = []
if not tmp_dir: if content_only:
raise ValueError("tmp_dir is required if excludes are provided") find_args.extend(["-mindepth", "1"])
if not content_only: try:
raise ValueError("Cannot delete the folder if excludes are provided") proc = subprocess.run(
["/usr/bin/find", str(folder), "-xdev", *find_args, "-delete"],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
env=clean_env(),
text=True,
check=True,
)
if proc.returncode != 0:
_LOGGER.error("Can't remove folder %s: %s", folder, proc.stderr.strip())
except OSError as err:
_LOGGER.exception("Can't remove folder %s: %s", folder, err)
temp = TemporaryDirectory(dir=tmp_dir)
temp_path = Path(temp.name) def remove_folder_with_excludes(
folder: Path,
excludes: list[str],
tmp_dir: Path | None = None,
) -> None:
"""Remove folder with excludes."""
with TemporaryDirectory(dir=tmp_dir) as temp_path:
temp_path = Path(temp_path)
moved_files: list[Path] = [] moved_files: list[Path] = []
for item in folder.iterdir(): for item in folder.iterdir():
if any(item.match(exclude) for exclude in excludes): if any(item.match(exclude) for exclude in excludes):
moved_files.append(item.rename(temp_path / item.name)) moved_files.append(item.rename(temp_path / item.name))
find_args = [] remove_folder(folder, content_only=True)
if content_only: for item in moved_files:
find_args.extend(["-mindepth", "1"]) item.rename(folder / item.name)
try:
proc = await asyncio.create_subprocess_exec(
"/usr/bin/find",
folder,
"-xdev",
*find_args,
"-delete",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
env=clean_env(),
)
_, error_msg = await proc.communicate()
except OSError as err:
_LOGGER.exception("Can't remove folder %s: %s", folder, err)
else:
if proc.returncode == 0:
return
_LOGGER.error(
"Can't remove folder %s: %s", folder, error_msg.decode("utf-8").strip()
)
finally:
if excludes:
for item in moved_files:
item.rename(folder / item.name)
temp.cleanup()
def clean_env() -> dict[str, str]: def clean_env() -> dict[str, str]:

View File

@ -3,13 +3,10 @@
from pathlib import Path from pathlib import Path
import shutil import shutil
import pytest
from supervisor.utils import remove_folder from supervisor.utils import remove_folder
@pytest.mark.asyncio def test_remove_all(tmp_path):
async def test_remove_all(tmp_path):
"""Test remove folder.""" """Test remove folder."""
# Prepair test folder # Prepair test folder
temp_orig = tmp_path.joinpath("orig") temp_orig = tmp_path.joinpath("orig")
@ -17,12 +14,11 @@ async def test_remove_all(tmp_path):
shutil.copytree(fixture_data, temp_orig, symlinks=True) shutil.copytree(fixture_data, temp_orig, symlinks=True)
assert temp_orig.exists() assert temp_orig.exists()
await remove_folder(temp_orig) remove_folder(temp_orig)
assert not temp_orig.exists() assert not temp_orig.exists()
@pytest.mark.asyncio def test_remove_content(tmp_path):
async def test_remove_content(tmp_path):
"""Test remove content of folder.""" """Test remove content of folder."""
# Prepair test folder # Prepair test folder
temp_orig = tmp_path.joinpath("orig") temp_orig = tmp_path.joinpath("orig")
@ -38,8 +34,7 @@ async def test_remove_content(tmp_path):
assert test_folder.exists() assert test_folder.exists()
assert test_file.exists() assert test_file.exists()
assert test_hidden.exists() assert test_hidden.exists()
remove_folder(temp_orig, content_only=True)
await remove_folder(temp_orig, content_only=True)
assert not test_folder.exists() assert not test_folder.exists()
assert not test_file.exists() assert not test_file.exists()