Remove I/O in event loop for backup load, import and remove (#5647)

* Avoid IO in event loop when removing backup

* Refactor backup size calculation

Currently size is lazy loaded when required via properties. This
however is blocking the async event loop.

Backup sizes don't change. Instead of lazy loading the size of a backup
simply determine it on loading/after creation.

* Fix tests for backup size change

* Avoid IO in event loop when loading backups

* Avoid IO in event loop when importing a backup
This commit is contained in:
Stefan Agner 2025-02-19 16:00:17 +01:00 committed by GitHub
parent 37bc703bbb
commit 34939cfe52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 102 additions and 75 deletions

View File

@ -39,6 +39,7 @@ from ..const import (
ATTR_PROTECTED,
ATTR_REPOSITORIES,
ATTR_SIZE,
ATTR_SIZE_BYTES,
ATTR_SLUG,
ATTR_SUPERVISOR_VERSION,
ATTR_TIMEOUT,
@ -58,7 +59,6 @@ from .const import (
ATTR_BACKGROUND,
ATTR_LOCATION_ATTRIBUTES,
ATTR_LOCATIONS,
ATTR_SIZE_BYTES,
CONTENT_TYPE_TAR,
)
from .utils import api_process, api_validate
@ -155,7 +155,7 @@ class APIBackups(CoreSysAttributes):
return {
loc if loc else LOCATION_LOCAL: {
ATTR_PROTECTED: backup.all_locations[loc][ATTR_PROTECTED],
ATTR_SIZE_BYTES: backup.location_size(loc),
ATTR_SIZE_BYTES: backup.all_locations[loc][ATTR_SIZE_BYTES],
}
for loc in backup.locations
}
@ -457,7 +457,7 @@ class APIBackups(CoreSysAttributes):
else:
self._validate_cloud_backup_location(request, backup.location)
self.sys_backups.remove(backup, locations=locations)
await self.sys_backups.remove(backup, locations=locations)
@api_process
async def download(self, request: web.Request):

View File

@ -60,7 +60,6 @@ ATTR_REVISION = "revision"
ATTR_SAFE_MODE = "safe_mode"
ATTR_SEAT = "seat"
ATTR_SIGNED = "signed"
ATTR_SIZE_BYTES = "size_bytes"
ATTR_STARTUP_TIME = "startup_time"
ATTR_STATUS = "status"
ATTR_SUBSYSTEM = "subsystem"

View File

@ -6,7 +6,6 @@ from collections.abc import AsyncGenerator, Awaitable
from contextlib import asynccontextmanager
from copy import deepcopy
from datetime import timedelta
from functools import lru_cache
import io
import json
import logging
@ -40,6 +39,7 @@ from ..const import (
ATTR_PROTECTED,
ATTR_REPOSITORIES,
ATTR_SIZE,
ATTR_SIZE_BYTES,
ATTR_SLUG,
ATTR_SUPERVISOR_VERSION,
ATTR_TYPE,
@ -67,12 +67,6 @@ from .validate import SCHEMA_BACKUP
_LOGGER: logging.Logger = logging.getLogger(__name__)
@lru_cache
def _backup_file_size(backup: Path) -> int:
"""Get backup file size."""
return backup.stat().st_size if backup.is_file() else 0
def location_sort_key(value: str | None) -> str:
"""Sort locations, None is always first else alphabetical."""
return value if value else ""
@ -88,6 +82,7 @@ class Backup(JobGroup):
slug: str,
location: str | None,
data: dict[str, Any] | None = None,
size_bytes: int = 0,
):
"""Initialize a backup."""
super().__init__(
@ -102,6 +97,7 @@ class Backup(JobGroup):
location: {
ATTR_PATH: tar_file,
ATTR_PROTECTED: data.get(ATTR_PROTECTED, False) if data else False,
ATTR_SIZE_BYTES: size_bytes,
}
}
@ -236,7 +232,7 @@ class Backup(JobGroup):
@property
def size_bytes(self) -> int:
"""Return backup size in bytes."""
return self.location_size(self.location)
return self._locations[self.location][ATTR_SIZE_BYTES]
@property
def is_new(self) -> bool:
@ -260,14 +256,6 @@ class Backup(JobGroup):
"""Returns a copy of the data."""
return deepcopy(self._data)
def location_size(self, location: str | None) -> int:
"""Get size of backup in a location."""
if location not in self.all_locations:
return 0
backup = self.all_locations[location][ATTR_PATH]
return _backup_file_size(backup)
def __eq__(self, other: Any) -> bool:
"""Return true if backups have same metadata."""
if not isinstance(other, Backup):
@ -416,23 +404,24 @@ class Backup(JobGroup):
async def load(self):
"""Read backup.json from tar file."""
if not self.tarfile.is_file():
_LOGGER.error("No tarfile located at %s", self.tarfile)
return False
def _load_file():
"""Read backup.json."""
with tarfile.open(self.tarfile, "r:") as backup:
def _load_file(tarfile_path: Path):
"""Get backup size and read backup metadata."""
size_bytes = tarfile_path.stat().st_size
with tarfile.open(tarfile_path, "r:") as backup:
if "./snapshot.json" in [entry.name for entry in backup.getmembers()]:
# Old backups stil uses "snapshot.json", we need to support that forever
json_file = backup.extractfile("./snapshot.json")
else:
json_file = backup.extractfile("./backup.json")
return json_file.read()
return size_bytes, json_file.read()
# read backup.json
try:
raw = await self.sys_run_in_executor(_load_file)
size_bytes, raw = await self.sys_run_in_executor(_load_file, self.tarfile)
except FileNotFoundError:
_LOGGER.error("No tarfile located at %s", self.tarfile)
return False
except (tarfile.TarError, KeyError) as err:
_LOGGER.error("Can't read backup tarfile %s: %s", self.tarfile, err)
return False
@ -457,6 +446,7 @@ class Backup(JobGroup):
if self._data[ATTR_PROTECTED]:
self._locations[self.location][ATTR_PROTECTED] = True
self._locations[self.location][ATTR_SIZE_BYTES] = size_bytes
return True
@ -480,6 +470,11 @@ class Backup(JobGroup):
)
return outer_secure_tarfile, outer_secure_tarfile.open()
def _close_outer_tarfile() -> int:
"""Close outer tarfile."""
self._outer_secure_tarfile.close()
return self.tarfile.stat().st_size
self._outer_secure_tarfile, outer_tarfile = await self.sys_run_in_executor(
_open_outer_tarfile
)
@ -487,7 +482,8 @@ class Backup(JobGroup):
yield
finally:
await self._create_cleanup(outer_tarfile)
await self.sys_run_in_executor(self._outer_secure_tarfile.close)
size_bytes = await self.sys_run_in_executor(_close_outer_tarfile)
self._locations[self.location][ATTR_SIZE_BYTES] = size_bytes
self._outer_secure_tarfile = None
@asynccontextmanager

View File

@ -14,6 +14,7 @@ from ..const import (
ATTR_DAYS_UNTIL_STALE,
ATTR_PATH,
ATTR_PROTECTED,
ATTR_SIZE_BYTES,
FILE_HASSIO_BACKUPS,
FOLDER_HOMEASSISTANT,
CoreState,
@ -258,7 +259,12 @@ class BackupManager(FileConfiguration, JobGroup):
else:
backups[backup.slug] = Backup(
self.coresys, tar_file, backup.slug, location_name, backup.data
self.coresys,
tar_file,
backup.slug,
location_name,
backup.data,
backup.size_bytes,
)
return True
@ -272,7 +278,9 @@ class BackupManager(FileConfiguration, JobGroup):
tasks = [
self.sys_create_task(_load_backup(_location, tar_file))
for _location, path in locations.items()
for tar_file in self._list_backup_files(path)
for tar_file in await self.sys_run_in_executor(
self._list_backup_files, path
)
]
_LOGGER.info("Found %d backup files", len(tasks))
@ -305,7 +313,7 @@ class BackupManager(FileConfiguration, JobGroup):
return True
def remove(
async def remove(
self,
backup: Backup,
locations: list[LOCATION_TYPE] | None = None,
@ -324,7 +332,7 @@ class BackupManager(FileConfiguration, JobGroup):
for location in targets:
backup_tarfile = backup.all_locations[location][ATTR_PATH]
try:
backup_tarfile.unlink()
await self.sys_run_in_executor(backup_tarfile.unlink)
del backup.all_locations[location]
except FileNotFoundError as err:
self.sys_create_task(self.reload(location))
@ -397,7 +405,11 @@ class BackupManager(FileConfiguration, JobGroup):
backup.all_locations.update(
{
loc: {ATTR_PATH: path, ATTR_PROTECTED: backup.protected}
loc: {
ATTR_PATH: path,
ATTR_PROTECTED: backup.protected,
ATTR_SIZE_BYTES: backup.size_bytes,
}
for loc, path in all_new_locations.items()
}
)
@ -426,8 +438,7 @@ class BackupManager(FileConfiguration, JobGroup):
tar_file = Path(self._get_base_path(location), f"{backup.slug}.tar")
try:
backup.tarfile.rename(tar_file)
await self.sys_run_in_executor(backup.tarfile.rename, tar_file)
except OSError as err:
if err.errno == errno.EBADMSG and location in {LOCATION_CLOUD_BACKUP, None}:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE
@ -444,7 +455,7 @@ class BackupManager(FileConfiguration, JobGroup):
)
if not await backup.load():
# Remove invalid backup from location it was moved to
backup.tarfile.unlink()
await self.sys_run_in_executor(backup.tarfile.unlink)
return None
_LOGGER.info("Successfully imported %s", backup.slug)

View File

@ -295,6 +295,7 @@ ATTR_SESSION_DATA_USER = "user"
ATTR_SESSION_DATA_USER_ID = "user_id"
ATTR_SIGNAL = "signal"
ATTR_SIZE = "size"
ATTR_SIZE_BYTES = "size_bytes"
ATTR_SLUG = "slug"
ATTR_SOURCE = "source"
ATTR_SQUASH = "squash"

View File

@ -370,6 +370,6 @@ class Tasks(CoreSysAttributes):
]
for backup in old_backups:
try:
self.sys_backups.remove(backup, [LOCATION_CLOUD_BACKUP])
await self.sys_backups.remove(backup, [LOCATION_CLOUD_BACKUP])
except BackupFileNotFoundError as err:
_LOGGER.debug("Can't remove backup %s: %s", backup.slug, err)

View File

@ -34,7 +34,7 @@ class FixupSystemClearFullBackup(FixupBase):
: -1 * MINIMUM_FULL_BACKUPS
]:
try:
self.sys_backups.remove(backup)
await self.sys_backups.remove(backup)
except BackupFileNotFoundError as err:
_LOGGER.debug("Can't remove backup %s: %s", backup.slug, err)

View File

@ -571,7 +571,9 @@ async def test_cloud_backup_core_only(api_client: TestClient, mock_full_backup:
assert resp.status == 403
# pylint: disable-next=protected-access
mock_full_backup._locations = {".cloud_backup": {"path": None, "protected": False}}
mock_full_backup._locations = {
".cloud_backup": {"path": None, "protected": False, "size_bytes": 10240}
}
assert mock_full_backup.location == ".cloud_backup"
resp = await api_client.post(f"/backups/{mock_full_backup.slug}/restore/full")
@ -650,8 +652,8 @@ async def test_backup_to_multiple_locations(
assert orig_backup.exists()
assert copy_backup.exists()
assert coresys.backups.get(slug).all_locations == {
None: {"path": orig_backup, "protected": False},
".cloud_backup": {"path": copy_backup, "protected": False},
None: {"path": orig_backup, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": copy_backup, "protected": False, "size_bytes": 10240},
}
assert coresys.backups.get(slug).location is None
@ -711,8 +713,8 @@ async def test_upload_to_multiple_locations(
assert orig_backup.exists()
assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False},
".cloud_backup": {"path": copy_backup, "protected": False},
None: {"path": orig_backup, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": copy_backup, "protected": False, "size_bytes": 10240},
}
assert coresys.backups.get("7fed74c8").location is None
@ -726,7 +728,7 @@ async def test_upload_duplicate_backup_new_location(
orig_backup = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload()
assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False}
None: {"path": orig_backup, "protected": False, "size_bytes": 10240}
}
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
@ -743,8 +745,8 @@ async def test_upload_duplicate_backup_new_location(
assert orig_backup.exists()
assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False},
".cloud_backup": {"path": copy_backup, "protected": False},
None: {"path": orig_backup, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": copy_backup, "protected": False, "size_bytes": 10240},
}
assert coresys.backups.get("7fed74c8").location is None
@ -781,7 +783,7 @@ async def test_upload_with_filename(
orig_backup = coresys.config.path_backup / filename
assert orig_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False}
None: {"path": orig_backup, "protected": False, "size_bytes": 10240}
}
assert coresys.backups.get("7fed74c8").location is None
@ -814,8 +816,8 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
None: {"path": location_1, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240},
}
resp = await api_client.delete(
@ -826,7 +828,9 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
assert location_1.exists()
assert not location_2.exists()
assert coresys.backups.get("7fed74c8")
assert backup.all_locations == {None: {"path": location_1, "protected": False}}
assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240}
}
@pytest.mark.usefixtures("tmp_supervisor_data")
@ -838,7 +842,7 @@ async def test_remove_backup_file_not_found(api_client: TestClient, coresys: Cor
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: {"path": location, "protected": False},
None: {"path": location, "protected": False, "size_bytes": 10240},
}
location.unlink()
@ -866,8 +870,8 @@ async def test_download_backup_from_location(
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
None: {"path": location_1, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240},
}
# The use case of this is user might want to pick a particular mount if one is flaky
@ -981,8 +985,12 @@ async def test_restore_backup_unencrypted_after_encrypted(
backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == {
None: {"path": Path(enc_tar), "protected": True},
".cloud_backup": {"path": Path(unc_tar), "protected": False},
None: {"path": Path(enc_tar), "protected": True, "size_bytes": 10240},
".cloud_backup": {
"path": Path(unc_tar),
"protected": False,
"size_bytes": 10240,
},
}
# pylint: disable=fixme
@ -1095,8 +1103,12 @@ async def test_backup_mixed_encryption(api_client: TestClient, coresys: CoreSys)
backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == {
None: {"path": Path(enc_tar), "protected": True},
".cloud_backup": {"path": Path(unc_tar), "protected": False},
None: {"path": Path(enc_tar), "protected": True, "size_bytes": 10240},
".cloud_backup": {
"path": Path(unc_tar),
"protected": False,
"size_bytes": 10240,
},
}
resp = await api_client.get("/backups")
@ -1255,7 +1267,7 @@ async def test_missing_file_removes_location_from_cache(
assert resp.status == 404
# Wait for reload task to complete and confirm location is removed
await asyncio.sleep(0)
await asyncio.sleep(0.01)
assert coresys.backups.get(slug).all_locations.keys() == {None}
@ -1310,7 +1322,7 @@ async def test_missing_file_removes_backup_from_cache(
assert resp.status == 404
# Wait for reload task to complete and confirm backup is removed
await asyncio.sleep(0)
await asyncio.sleep(0.01)
assert not coresys.backups.list_backups

View File

@ -47,7 +47,9 @@ async def test_consolidate_conflict_varied_encryption(
f"Backup d9c48f8b exists in two files in locations None. Ignoring {enc_tar.as_posix()}"
in caplog.text
)
assert enc_backup.all_locations == {None: {"path": unc_tar, "protected": False}}
assert enc_backup.all_locations == {
None: {"path": unc_tar, "protected": False, "size_bytes": 10240}
}
async def test_consolidate(
@ -72,8 +74,8 @@ async def test_consolidate(
not in caplog.text
)
assert enc_backup.all_locations == {
None: {"path": enc_tar, "protected": True},
"backup_test": {"path": unc_tar, "protected": False},
None: {"path": enc_tar, "protected": True, "size_bytes": 10240},
"backup_test": {"path": unc_tar, "protected": False, "size_bytes": 10240},
}

View File

@ -1737,12 +1737,12 @@ async def test_backup_remove_error(
err.errno = errno.EBUSY
with pytest.raises(BackupError):
coresys.backups.remove(backup)
await coresys.backups.remove(backup)
assert coresys.core.healthy is True
err.errno = errno.EBADMSG
with pytest.raises(BackupError):
coresys.backups.remove(backup)
await coresys.backups.remove(backup)
assert coresys.core.healthy is healthy_expected
@ -1986,11 +1986,11 @@ async def test_backup_remove_multiple_locations(coresys: CoreSys):
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
None: {"path": location_1, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240},
}
coresys.backups.remove(backup)
await coresys.backups.remove(backup)
assert not location_1.exists()
assert not location_2.exists()
assert not coresys.backups.get("7fed74c8")
@ -2006,15 +2006,17 @@ async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
None: {"path": location_1, "protected": False, "size_bytes": 10240},
".cloud_backup": {"path": location_2, "protected": False, "size_bytes": 10240},
}
coresys.backups.remove(backup, locations=[".cloud_backup"])
await coresys.backups.remove(backup, locations=[".cloud_backup"])
assert location_1.exists()
assert not location_2.exists()
assert coresys.backups.get("7fed74c8")
assert backup.all_locations == {None: {"path": location_1, "protected": False}}
assert backup.all_locations == {
None: {"path": location_1, "protected": False, "size_bytes": 10240}
}
@pytest.mark.usefixtures("tmp_supervisor_data")
@ -2062,4 +2064,4 @@ async def test_remove_non_existing_backup_raises(
err.errno = errno.ENOENT
with pytest.raises(BackupFileNotFoundError):
coresys.backups.remove(backup)
await coresys.backups.remove(backup)

View File

@ -575,7 +575,9 @@ def install_addon_example(coresys: CoreSys, repository):
@pytest.fixture
async def mock_full_backup(coresys: CoreSys, tmp_path) -> Backup:
"""Mock a full backup."""
mock_backup = Backup(coresys, Path(tmp_path, "test_backup.tar"), "test", None)
mock_backup = Backup(
coresys, Path(tmp_path, "test_backup.tar"), "test", None, None, 10240
)
mock_backup.new("Test", utcnow().isoformat(), BackupType.FULL)
mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"]
mock_backup.docker = {}
@ -600,7 +602,9 @@ async def mock_full_backup(coresys: CoreSys, tmp_path) -> Backup:
@pytest.fixture
async def mock_partial_backup(coresys: CoreSys, tmp_path) -> Backup:
"""Mock a partial backup."""
mock_backup = Backup(coresys, Path(tmp_path, "test_backup.tar"), "test", None)
mock_backup = Backup(
coresys, Path(tmp_path, "test_backup.tar"), "test", None, None, 10240
)
mock_backup.new("Test", utcnow().isoformat(), BackupType.PARTIAL)
mock_backup.repositories = ["https://github.com/awesome-developer/awesome-repo"]
mock_backup.docker = {}