Backup protected status can vary per location (#5569)

* Backup protected status can vary per location

* Fix test_backup_remove_error test

* Update supervisor/backups/backup.py

* Add Docker registry configuration to backup metadata

* Make use of backup location fixture

* Address pylint

---------

Co-authored-by: Stefan Agner <stefan@agner.ch>
This commit is contained in:
Mike Degatano 2025-01-23 15:05:35 -05:00 committed by GitHub
parent 088832c253
commit 61a2101d8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 230 additions and 42 deletions

View File

@ -34,6 +34,7 @@ from ..const import (
ATTR_LOCATION, ATTR_LOCATION,
ATTR_NAME, ATTR_NAME,
ATTR_PASSWORD, ATTR_PASSWORD,
ATTR_PATH,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
ATTR_SIZE, ATTR_SIZE,
@ -55,6 +56,7 @@ from .const import (
ATTR_ADDITIONAL_LOCATIONS, ATTR_ADDITIONAL_LOCATIONS,
ATTR_BACKGROUND, ATTR_BACKGROUND,
ATTR_LOCATIONS, ATTR_LOCATIONS,
ATTR_PROTECTED_LOCATIONS,
ATTR_SIZE_BYTES, ATTR_SIZE_BYTES,
CONTENT_TYPE_TAR, CONTENT_TYPE_TAR,
) )
@ -165,6 +167,11 @@ class APIBackups(CoreSysAttributes):
ATTR_LOCATION: backup.location, ATTR_LOCATION: backup.location,
ATTR_LOCATIONS: backup.locations, ATTR_LOCATIONS: backup.locations,
ATTR_PROTECTED: backup.protected, ATTR_PROTECTED: backup.protected,
ATTR_PROTECTED_LOCATIONS: [
loc
for loc in backup.locations
if backup.all_locations[loc][ATTR_PROTECTED]
],
ATTR_COMPRESSED: backup.compressed, ATTR_COMPRESSED: backup.compressed,
ATTR_CONTENT: { ATTR_CONTENT: {
ATTR_HOMEASSISTANT: backup.homeassistant_version is not None, ATTR_HOMEASSISTANT: backup.homeassistant_version is not None,
@ -236,6 +243,11 @@ class APIBackups(CoreSysAttributes):
ATTR_SIZE_BYTES: backup.size_bytes, ATTR_SIZE_BYTES: backup.size_bytes,
ATTR_COMPRESSED: backup.compressed, ATTR_COMPRESSED: backup.compressed,
ATTR_PROTECTED: backup.protected, ATTR_PROTECTED: backup.protected,
ATTR_PROTECTED_LOCATIONS: [
loc
for loc in backup.locations
if backup.all_locations[loc][ATTR_PROTECTED]
],
ATTR_SUPERVISOR_VERSION: backup.supervisor_version, ATTR_SUPERVISOR_VERSION: backup.supervisor_version,
ATTR_HOMEASSISTANT: backup.homeassistant_version, ATTR_HOMEASSISTANT: backup.homeassistant_version,
ATTR_LOCATION: backup.location, ATTR_LOCATION: backup.location,
@ -460,7 +472,7 @@ class APIBackups(CoreSysAttributes):
raise APIError(f"Backup {backup.slug} is not in location {location}") raise APIError(f"Backup {backup.slug} is not in location {location}")
_LOGGER.info("Downloading backup %s", backup.slug) _LOGGER.info("Downloading backup %s", backup.slug)
filename = backup.all_locations[location] filename = backup.all_locations[location][ATTR_PATH]
response = web.FileResponse(filename) response = web.FileResponse(filename)
response.content_type = CONTENT_TYPE_TAR response.content_type = CONTENT_TYPE_TAR

View File

@ -53,6 +53,7 @@ ATTR_MODEL = "model"
ATTR_MOUNTS = "mounts" ATTR_MOUNTS = "mounts"
ATTR_MOUNT_POINTS = "mount_points" ATTR_MOUNT_POINTS = "mount_points"
ATTR_PANEL_PATH = "panel_path" ATTR_PANEL_PATH = "panel_path"
ATTR_PROTECTED_LOCATIONS = "protected_locations"
ATTR_REMOVABLE = "removable" ATTR_REMOVABLE = "removable"
ATTR_REMOVE_CONFIG = "remove_config" ATTR_REMOVE_CONFIG = "remove_config"
ATTR_REVISION = "revision" ATTR_REVISION = "revision"

View File

@ -39,6 +39,7 @@ from ..const import (
ATTR_HOMEASSISTANT, ATTR_HOMEASSISTANT,
ATTR_NAME, ATTR_NAME,
ATTR_PASSWORD, ATTR_PASSWORD,
ATTR_PATH,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_REGISTRIES, ATTR_REGISTRIES,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
@ -91,7 +92,12 @@ class Backup(JobGroup):
self._outer_secure_tarfile: SecureTarFile | None = None self._outer_secure_tarfile: SecureTarFile | None = None
self._key: bytes | None = None self._key: bytes | None = None
self._aes: Cipher | None = None self._aes: Cipher | None = None
self._locations: dict[str | None, Path] = {location: tar_file} self._locations: dict[str | None, dict[str, Path | bool]] = {
location: {
ATTR_PATH: tar_file,
ATTR_PROTECTED: data.get(ATTR_PROTECTED, False) if data else False,
}
}
@property @property
def version(self) -> int: def version(self) -> int:
@ -121,7 +127,7 @@ class Backup(JobGroup):
@property @property
def protected(self) -> bool: def protected(self) -> bool:
"""Return backup date.""" """Return backup date."""
return self._data[ATTR_PROTECTED] return self._locations[self.location][ATTR_PROTECTED]
@property @property
def compressed(self) -> bool: def compressed(self) -> bool:
@ -198,7 +204,7 @@ class Backup(JobGroup):
return self.locations[0] return self.locations[0]
@property @property
def all_locations(self) -> dict[str | None, Path]: def all_locations(self) -> dict[str | None, dict[str, Path | bool]]:
"""Return all locations this backup was found in.""" """Return all locations this backup was found in."""
return self._locations return self._locations
@ -236,7 +242,7 @@ class Backup(JobGroup):
@property @property
def tarfile(self) -> Path: def tarfile(self) -> Path:
"""Return path to backup tarfile.""" """Return path to backup tarfile."""
return self._locations[self.location] return self._locations[self.location][ATTR_PATH]
@property @property
def is_current(self) -> bool: def is_current(self) -> bool:
@ -252,7 +258,27 @@ class Backup(JobGroup):
def __eq__(self, other: Any) -> bool: def __eq__(self, other: Any) -> bool:
"""Return true if backups have same metadata.""" """Return true if backups have same metadata."""
return isinstance(other, Backup) and self._data == other._data if not isinstance(other, Backup):
return False
# Compare all fields except ones about protection. Current encryption status does not affect equality
keys = self._data.keys() | other._data.keys()
for k in keys - {ATTR_PROTECTED, ATTR_CRYPTO, ATTR_DOCKER}:
if (
k not in self._data
or k not in other._data
or self._data[k] != other._data[k]
):
_LOGGER.debug(
"Backup %s and %s not equal because %s field has different value: %s and %s",
self.slug,
other.slug,
k,
self._data.get(k),
other._data.get(k),
)
return False
return True
def consolidate(self, backup: Self) -> None: def consolidate(self, backup: Self) -> None:
"""Consolidate two backups with same slug in different locations.""" """Consolidate two backups with same slug in different locations."""
@ -264,6 +290,20 @@ class Backup(JobGroup):
raise BackupInvalidError( raise BackupInvalidError(
f"Backup in {backup.location} and {self.location} both have slug {self.slug} but are not the same!" f"Backup in {backup.location} and {self.location} both have slug {self.slug} but are not the same!"
) )
# In case of conflict we always ignore the ones from the first one. But log them to let the user know
if conflict := {
loc: val[ATTR_PATH]
for loc, val in self.all_locations.items()
if loc in backup.all_locations and backup.all_locations[loc] != val
}:
_LOGGER.warning(
"Backup %s exists in two files in locations %s. Ignoring %s",
self.slug,
", ".join(str(loc) for loc in conflict),
", ".join([path.as_posix() for path in conflict.values()]),
)
self._locations.update(backup.all_locations) self._locations.update(backup.all_locations)
def new( def new(
@ -292,6 +332,7 @@ class Backup(JobGroup):
self._init_password(password) self._init_password(password)
self._data[ATTR_PROTECTED] = True self._data[ATTR_PROTECTED] = True
self._data[ATTR_CRYPTO] = CRYPTO_AES128 self._data[ATTR_CRYPTO] = CRYPTO_AES128
self._locations[self.location][ATTR_PROTECTED] = True
if not compressed: if not compressed:
self._data[ATTR_COMPRESSED] = False self._data[ATTR_COMPRESSED] = False
@ -418,6 +459,9 @@ class Backup(JobGroup):
) )
return False return False
if self._data[ATTR_PROTECTED]:
self._locations[self.location][ATTR_PROTECTED] = True
return True return True
@asynccontextmanager @asynccontextmanager
@ -452,7 +496,9 @@ class Backup(JobGroup):
) )
backup_tarfile = ( backup_tarfile = (
self.tarfile if location == DEFAULT else self.all_locations[location] self.tarfile
if location == DEFAULT
else self.all_locations[location][ATTR_PATH]
) )
if not backup_tarfile.is_file(): if not backup_tarfile.is_file():
raise BackupError( raise BackupError(

View File

@ -12,6 +12,8 @@ from shutil import copy
from ..addons.addon import Addon from ..addons.addon import Addon
from ..const import ( from ..const import (
ATTR_DAYS_UNTIL_STALE, ATTR_DAYS_UNTIL_STALE,
ATTR_PATH,
ATTR_PROTECTED,
FILE_HASSIO_BACKUPS, FILE_HASSIO_BACKUPS,
FOLDER_HOMEASSISTANT, FOLDER_HOMEASSISTANT,
CoreState, CoreState,
@ -291,7 +293,7 @@ class BackupManager(FileConfiguration, JobGroup):
) )
for location in targets: for location in targets:
try: try:
backup.all_locations[location].unlink() backup.all_locations[location][ATTR_PATH].unlink()
del backup.all_locations[location] del backup.all_locations[location]
except OSError as err: except OSError as err:
if err.errno == errno.EBADMSG and location in { if err.errno == errno.EBADMSG and location in {
@ -345,13 +347,20 @@ class BackupManager(FileConfiguration, JobGroup):
return all_locations return all_locations
try: try:
backup.all_locations.update( all_new_locations = await self.sys_run_in_executor(
await self.sys_run_in_executor(copy_to_additional_locations) copy_to_additional_locations
) )
except BackupDataDiskBadMessageError: except BackupDataDiskBadMessageError:
self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE self.sys_resolution.unhealthy = UnhealthyReason.OSERROR_BAD_MESSAGE
raise raise
backup.all_locations.update(
{
loc: {ATTR_PATH: path, ATTR_PROTECTED: backup.protected}
for loc, path in all_new_locations.items()
}
)
@Job(name="backup_manager_import_backup") @Job(name="backup_manager_import_backup")
async def import_backup( async def import_backup(
self, self,
@ -676,6 +685,30 @@ class BackupManager(FileConfiguration, JobGroup):
_job_override__cleanup=False _job_override__cleanup=False
) )
async def _validate_location_password(
self,
backup: Backup,
password: str | None = None,
location: str | None | type[DEFAULT] = DEFAULT,
) -> None:
"""Validate location and password for backup, raise if invalid."""
if location != DEFAULT and location not in backup.all_locations:
raise BackupInvalidError(
f"Backup {backup.slug} does not exist in {location}", _LOGGER.error
)
if (
location == DEFAULT
and backup.protected
or location != DEFAULT
and backup.all_locations[location][ATTR_PROTECTED]
):
backup.set_password(password)
if not await backup.validate_password():
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)
@Job( @Job(
name=JOB_FULL_RESTORE, name=JOB_FULL_RESTORE,
conditions=[ conditions=[
@ -704,12 +737,7 @@ class BackupManager(FileConfiguration, JobGroup):
f"{backup.slug} is only a partial backup!", _LOGGER.error f"{backup.slug} is only a partial backup!", _LOGGER.error
) )
if backup.protected: await self._validate_location_password(backup, password, location)
backup.set_password(password)
if not await backup.validate_password():
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)
if backup.supervisor_version > self.sys_supervisor.version: if backup.supervisor_version > self.sys_supervisor.version:
raise BackupInvalidError( raise BackupInvalidError(
@ -774,12 +802,7 @@ class BackupManager(FileConfiguration, JobGroup):
folder_list.remove(FOLDER_HOMEASSISTANT) folder_list.remove(FOLDER_HOMEASSISTANT)
homeassistant = True homeassistant = True
if backup.protected: await self._validate_location_password(backup, password, location)
backup.set_password(password)
if not await backup.validate_password():
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)
if backup.homeassistant is None and homeassistant: if backup.homeassistant is None and homeassistant:
raise BackupInvalidError( raise BackupInvalidError(

View File

@ -544,7 +544,7 @@ async def test_cloud_backup_core_only(api_client: TestClient, mock_full_backup:
assert resp.status == 403 assert resp.status == 403
# pylint: disable-next=protected-access # pylint: disable-next=protected-access
mock_full_backup._locations = {".cloud_backup": None} mock_full_backup._locations = {".cloud_backup": {"path": None, "protected": False}}
assert mock_full_backup.location == ".cloud_backup" assert mock_full_backup.location == ".cloud_backup"
resp = await api_client.post(f"/backups/{mock_full_backup.slug}/restore/full") resp = await api_client.post(f"/backups/{mock_full_backup.slug}/restore/full")
@ -623,8 +623,8 @@ async def test_backup_to_multiple_locations(
assert orig_backup.exists() assert orig_backup.exists()
assert copy_backup.exists() assert copy_backup.exists()
assert coresys.backups.get(slug).all_locations == { assert coresys.backups.get(slug).all_locations == {
None: orig_backup, None: {"path": orig_backup, "protected": False},
".cloud_backup": copy_backup, ".cloud_backup": {"path": copy_backup, "protected": False},
} }
assert coresys.backups.get(slug).location is None assert coresys.backups.get(slug).location is None
@ -680,8 +680,8 @@ async def test_upload_to_multiple_locations(api_client: TestClient, coresys: Cor
assert orig_backup.exists() assert orig_backup.exists()
assert copy_backup.exists() assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == { assert coresys.backups.get("7fed74c8").all_locations == {
None: orig_backup, None: {"path": orig_backup, "protected": False},
".cloud_backup": copy_backup, ".cloud_backup": {"path": copy_backup, "protected": False},
} }
assert coresys.backups.get("7fed74c8").location is None assert coresys.backups.get("7fed74c8").location is None
@ -694,7 +694,9 @@ async def test_upload_duplicate_backup_new_location(
backup_file = get_fixture_path("backup_example.tar") backup_file = get_fixture_path("backup_example.tar")
orig_backup = Path(copy(backup_file, coresys.config.path_backup)) orig_backup = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload(None, "backup_example.tar") await coresys.backups.reload(None, "backup_example.tar")
assert coresys.backups.get("7fed74c8").all_locations == {None: orig_backup} assert coresys.backups.get("7fed74c8").all_locations == {
None: {"path": orig_backup, "protected": False}
}
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp: with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file) mp.append(file)
@ -710,8 +712,8 @@ async def test_upload_duplicate_backup_new_location(
assert orig_backup.exists() assert orig_backup.exists()
assert copy_backup.exists() assert copy_backup.exists()
assert coresys.backups.get("7fed74c8").all_locations == { assert coresys.backups.get("7fed74c8").all_locations == {
None: orig_backup, None: {"path": orig_backup, "protected": False},
".cloud_backup": copy_backup, ".cloud_backup": {"path": copy_backup, "protected": False},
} }
assert coresys.backups.get("7fed74c8").location is None assert coresys.backups.get("7fed74c8").location is None
@ -743,7 +745,10 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {None: location_1, ".cloud_backup": location_2} assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
}
resp = await api_client.delete( resp = await api_client.delete(
"/backups/7fed74c8", json={"location": ".cloud_backup"} "/backups/7fed74c8", json={"location": ".cloud_backup"}
@ -753,7 +758,7 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
assert location_1.exists() assert location_1.exists()
assert not location_2.exists() assert not location_2.exists()
assert coresys.backups.get("7fed74c8") assert coresys.backups.get("7fed74c8")
assert backup.all_locations == {None: location_1} assert backup.all_locations == {None: {"path": location_1, "protected": False}}
async def test_download_backup_from_location( async def test_download_backup_from_location(
@ -766,7 +771,10 @@ async def test_download_backup_from_location(
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {None: location_1, ".cloud_backup": location_2} assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
}
# The use case of this is user might want to pick a particular mount if one is flaky # The use case of this is user might want to pick a particular mount if one is flaky
# To simulate this, remove the file from one location and show one works and the other doesn't # To simulate this, remove the file from one location and show one works and the other doesn't
@ -839,7 +847,7 @@ async def test_restore_backup_from_location(
# The use case of this is user might want to pick a particular mount if one is flaky # The use case of this is user might want to pick a particular mount if one is flaky
# To simulate this, remove the file from one location and show one works and the other doesn't # To simulate this, remove the file from one location and show one works and the other doesn't
assert backup.location is None assert backup.location is None
backup.all_locations[None].unlink() backup.all_locations[None]["path"].unlink()
test_file.unlink() test_file.unlink()
resp = await api_client.post( resp = await api_client.post(
@ -850,7 +858,7 @@ async def test_restore_backup_from_location(
body = await resp.json() body = await resp.json()
assert ( assert (
body["message"] body["message"]
== f"Cannot open backup at {backup.all_locations[None].as_posix()}, file does not exist!" == f"Cannot open backup at {backup.all_locations[None]['path'].as_posix()}, file does not exist!"
) )
resp = await api_client.post( resp = await api_client.post(
@ -914,3 +922,65 @@ async def test_restore_homeassistant_adds_env(
] ]
== job.uuid == job.uuid
) )
@pytest.mark.usefixtures("tmp_supervisor_data")
async def test_backup_mixed_encryption(api_client: TestClient, coresys: CoreSys):
"""Test a backup with mixed encryption status across locations."""
enc_tar = copy(get_fixture_path("test_consolidate.tar"), coresys.config.path_backup)
unc_tar = copy(
get_fixture_path("test_consolidate_unc.tar"), coresys.config.path_core_backup
)
await coresys.backups.reload()
backup = coresys.backups.get("d9c48f8b")
assert backup.all_locations == {
None: {"path": Path(enc_tar), "protected": True},
".cloud_backup": {"path": Path(unc_tar), "protected": False},
}
resp = await api_client.get("/backups")
assert resp.status == 200
body = await resp.json()
assert body["data"]["backups"][0]["slug"] == "d9c48f8b"
assert body["data"]["backups"][0]["location"] is None
assert body["data"]["backups"][0]["locations"] == [None]
assert body["data"]["backups"][0]["protected"] is True
assert body["data"]["backups"][0]["protected_locations"] == [None]
@pytest.mark.parametrize(
("backup_type", "options"), [("full", {}), ("partial", {"folders": ["ssl"]})]
)
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
async def test_protected_backup(
api_client: TestClient, coresys: CoreSys, backup_type: str, options: dict[str, Any]
):
"""Test creating a protected backup."""
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
resp = await api_client.post(
f"/backups/new/{backup_type}",
json={"name": "test", "password": "test"} | options,
)
assert resp.status == 200
body = await resp.json()
assert (slug := body["data"]["slug"])
resp = await api_client.get("/backups")
assert resp.status == 200
body = await resp.json()
assert body["data"]["backups"][0]["slug"] == slug
assert body["data"]["backups"][0]["location"] is None
assert body["data"]["backups"][0]["locations"] == [None]
assert body["data"]["backups"][0]["protected"] is True
assert body["data"]["backups"][0]["protected_locations"] == [None]
resp = await api_client.get(f"/backups/{slug}/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["location"] is None
assert body["data"]["locations"] == [None]
assert body["data"]["protected"] is True
assert body["data"]["protected_locations"] == [None]

View File

@ -2,11 +2,16 @@
from os import listdir from os import listdir
from pathlib import Path from pathlib import Path
from shutil import copy
import pytest
from supervisor.backups.backup import Backup from supervisor.backups.backup import Backup
from supervisor.backups.const import BackupType from supervisor.backups.const import BackupType
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from tests.common import get_fixture_path
async def test_new_backup_stays_in_folder(coresys: CoreSys, tmp_path: Path): async def test_new_backup_stays_in_folder(coresys: CoreSys, tmp_path: Path):
"""Test making a new backup operates entirely within folder where backup will be stored.""" """Test making a new backup operates entirely within folder where backup will be stored."""
@ -20,3 +25,23 @@ async def test_new_backup_stays_in_folder(coresys: CoreSys, tmp_path: Path):
assert len(listdir(tmp_path)) == 1 assert len(listdir(tmp_path)) == 1
assert backup.tarfile.exists() assert backup.tarfile.exists()
async def test_consolidate_conflict_varied_encryption(
coresys: CoreSys, tmp_path: Path, caplog: pytest.LogCaptureFixture
):
"""Test consolidate with two backups in same location and varied encryption."""
enc_tar = Path(copy(get_fixture_path("test_consolidate.tar"), tmp_path))
enc_backup = Backup(coresys, enc_tar, "test", None)
await enc_backup.load()
unc_tar = Path(copy(get_fixture_path("test_consolidate_unc.tar"), tmp_path))
unc_backup = Backup(coresys, unc_tar, "test", None)
await unc_backup.load()
enc_backup.consolidate(unc_backup)
assert (
f"Backup d9c48f8b exists in two files in locations None. Ignoring {enc_tar.as_posix()}"
in caplog.text
)
assert enc_backup.all_locations == {None: {"path": unc_tar, "protected": False}}

View File

@ -1756,12 +1756,17 @@ async def test_backup_remove_error(
healthy_expected: bool, healthy_expected: bool,
): ):
"""Test removing a backup error.""" """Test removing a backup error."""
copy(get_fixture_path("backup_example.tar"), coresys.config.path_backup) location: LOCATION_TYPE = backup_locations[0]
await coresys.backups.reload(location=None, filename="backup_example.tar") backup_base_path = coresys.backups._get_base_path(location) # pylint: disable=protected-access
backup_base_path.mkdir(exist_ok=True)
copy(get_fixture_path("backup_example.tar"), backup_base_path)
await coresys.backups.reload(location=location, filename="backup_example.tar")
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
backup.all_locations[location_name] = (tar_mock := MagicMock()) assert location_name in backup.all_locations
tar_mock.unlink.side_effect = (err := OSError()) backup.all_locations[location_name]["path"] = (tar_file_mock := MagicMock())
tar_file_mock.unlink.side_effect = (err := OSError())
err.errno = errno.EBUSY err.errno = errno.EBUSY
assert coresys.backups.remove(backup) is False assert coresys.backups.remove(backup) is False
@ -2011,7 +2016,10 @@ async def test_backup_remove_multiple_locations(coresys: CoreSys):
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {None: location_1, ".cloud_backup": location_2} assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
}
coresys.backups.remove(backup) coresys.backups.remove(backup)
assert not location_1.exists() assert not location_1.exists()
@ -2028,13 +2036,16 @@ async def test_backup_remove_one_location_of_multiple(coresys: CoreSys):
await coresys.backups.reload() await coresys.backups.reload()
assert (backup := coresys.backups.get("7fed74c8")) assert (backup := coresys.backups.get("7fed74c8"))
assert backup.all_locations == {None: location_1, ".cloud_backup": location_2} assert backup.all_locations == {
None: {"path": location_1, "protected": False},
".cloud_backup": {"path": location_2, "protected": False},
}
coresys.backups.remove(backup, locations=[".cloud_backup"]) coresys.backups.remove(backup, locations=[".cloud_backup"])
assert location_1.exists() assert location_1.exists()
assert not location_2.exists() assert not location_2.exists()
assert coresys.backups.get("7fed74c8") assert coresys.backups.get("7fed74c8")
assert backup.all_locations == {None: location_1} assert backup.all_locations == {None: {"path": location_1, "protected": False}}
@pytest.mark.usefixtures("tmp_supervisor_data") @pytest.mark.usefixtures("tmp_supervisor_data")

BIN
tests/fixtures/test_consolidate.tar vendored Normal file

Binary file not shown.

BIN
tests/fixtures/test_consolidate_unc.tar vendored Normal file

Binary file not shown.