mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-11-09 10:59:43 +00:00
Fix and extend cloud backup support (#5464)
* Fix and extend cloud backup support * Clean up task for cloud backup and remove by location * Args to kwargs on backup methods * Fix backup remove error test and typing clean up
This commit is contained in:
@@ -15,7 +15,7 @@ from supervisor.addons.addon import Addon
|
||||
from supervisor.addons.const import AddonBackupMode
|
||||
from supervisor.addons.model import AddonModel
|
||||
from supervisor.backups.backup import Backup
|
||||
from supervisor.backups.const import BackupType
|
||||
from supervisor.backups.const import LOCATION_TYPE, BackupType
|
||||
from supervisor.backups.manager import BackupManager
|
||||
from supervisor.const import FOLDER_HOMEASSISTANT, FOLDER_SHARE, AddonState, CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
@@ -34,7 +34,6 @@ from supervisor.homeassistant.api import HomeAssistantAPI
|
||||
from supervisor.homeassistant.const import WSType
|
||||
from supervisor.homeassistant.core import HomeAssistantCore
|
||||
from supervisor.homeassistant.module import HomeAssistant
|
||||
from supervisor.jobs import JobSchedulerOptions
|
||||
from supervisor.jobs.const import JobCondition
|
||||
from supervisor.mounts.mount import Mount
|
||||
from supervisor.utils.json import read_json_file, write_json_file
|
||||
@@ -1718,29 +1717,35 @@ async def test_skip_homeassistant_database(
|
||||
assert not test_db_shm.exists()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("tmp_supervisor_data", "path_extern")
|
||||
@pytest.mark.parametrize(
|
||||
"tar_parent,healthy_expected",
|
||||
("backup_locations", "location_name", "healthy_expected"),
|
||||
[
|
||||
(Path("/data/mounts/test"), True),
|
||||
(Path("/data/backup"), False),
|
||||
(["test"], "test", True),
|
||||
([None], None, False),
|
||||
],
|
||||
indirect=["backup_locations"],
|
||||
)
|
||||
def test_backup_remove_error(
|
||||
async def test_backup_remove_error(
|
||||
coresys: CoreSys,
|
||||
full_backup_mock: Backup,
|
||||
tar_parent: Path,
|
||||
backup_locations: list[LOCATION_TYPE],
|
||||
location_name: str | None,
|
||||
healthy_expected: bool,
|
||||
):
|
||||
"""Test removing a backup error."""
|
||||
full_backup_mock.tarfile.unlink.side_effect = (err := OSError())
|
||||
full_backup_mock.tarfile.parent = tar_parent
|
||||
copy(get_fixture_path("backup_example.tar"), coresys.config.path_backup)
|
||||
await coresys.backups.reload(location=None, filename="backup_example.tar")
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
|
||||
backup.all_locations[location_name] = (tar_mock := MagicMock())
|
||||
tar_mock.unlink.side_effect = (err := OSError())
|
||||
|
||||
err.errno = errno.EBUSY
|
||||
assert coresys.backups.remove(full_backup_mock) is False
|
||||
assert coresys.backups.remove(backup) is False
|
||||
assert coresys.core.healthy is True
|
||||
|
||||
err.errno = errno.EBADMSG
|
||||
assert coresys.backups.remove(full_backup_mock) is False
|
||||
assert coresys.backups.remove(backup) is False
|
||||
assert coresys.core.healthy is healthy_expected
|
||||
|
||||
|
||||
@@ -1900,7 +1905,7 @@ async def test_reload_multiple_locations(coresys: CoreSys, tmp_supervisor_data:
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.location == ".cloud_backup"
|
||||
assert backup.locations == [".cloud_backup"]
|
||||
assert backup.all_locations == {".cloud_backup"}
|
||||
assert backup.all_locations.keys() == {".cloud_backup"}
|
||||
|
||||
copy(backup_file, tmp_supervisor_data / "backup")
|
||||
await coresys.backups.reload()
|
||||
@@ -1909,7 +1914,7 @@ async def test_reload_multiple_locations(coresys: CoreSys, tmp_supervisor_data:
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.location is None
|
||||
assert backup.locations == [None]
|
||||
assert backup.all_locations == {".cloud_backup", None}
|
||||
assert backup.all_locations.keys() == {".cloud_backup", None}
|
||||
|
||||
copy(backup_file, mount_dir)
|
||||
await coresys.backups.reload()
|
||||
@@ -1919,7 +1924,7 @@ async def test_reload_multiple_locations(coresys: CoreSys, tmp_supervisor_data:
|
||||
assert backup.location in {None, "backup_test"}
|
||||
assert None in backup.locations
|
||||
assert "backup_test" in backup.locations
|
||||
assert backup.all_locations == {".cloud_backup", None, "backup_test"}
|
||||
assert backup.all_locations.keys() == {".cloud_backup", None, "backup_test"}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mount_propagation", "mock_is_mount", "path_extern")
|
||||
@@ -1951,7 +1956,7 @@ async def test_partial_reload_multiple_locations(
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.location == ".cloud_backup"
|
||||
assert backup.locations == [".cloud_backup"]
|
||||
assert backup.all_locations == {".cloud_backup"}
|
||||
assert backup.all_locations.keys() == {".cloud_backup"}
|
||||
|
||||
copy(backup_file, tmp_supervisor_data / "backup")
|
||||
await coresys.backups.reload(location=None, filename="backup_example.tar")
|
||||
@@ -1960,7 +1965,7 @@ async def test_partial_reload_multiple_locations(
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.location is None
|
||||
assert backup.locations == [None]
|
||||
assert backup.all_locations == {".cloud_backup", None}
|
||||
assert backup.all_locations.keys() == {".cloud_backup", None}
|
||||
|
||||
copy(backup_file, mount_dir)
|
||||
await coresys.backups.reload(location=mount, filename="backup_example.tar")
|
||||
@@ -1968,66 +1973,42 @@ async def test_partial_reload_multiple_locations(
|
||||
assert coresys.backups.list_backups
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.location is None
|
||||
assert None in backup.locations
|
||||
assert "backup_test" in backup.locations
|
||||
assert backup.all_locations == {".cloud_backup", None, "backup_test"}
|
||||
assert backup.locations == [None, "backup_test"]
|
||||
assert backup.all_locations.keys() == {".cloud_backup", None, "backup_test"}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("location", "folder"), [(None, "backup"), (".cloud_backup", "cloud_backup")]
|
||||
)
|
||||
@pytest.mark.usefixtures("tmp_supervisor_data")
|
||||
async def test_partial_backup_complete_ws_message(
|
||||
coresys: CoreSys, ha_ws_client: AsyncMock, location: str | None, folder: str
|
||||
async def test_backup_remove_multiple_locations(
|
||||
coresys: CoreSys, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test WS message notifies core when a partial backup is complete."""
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
ha_ws_client.ha_version = AwesomeVersion("2025.12.0")
|
||||
"""Test removing a backup that exists in multiple locations."""
|
||||
backup_file = get_fixture_path("backup_example.tar")
|
||||
location_1 = Path(copy(backup_file, coresys.config.path_backup))
|
||||
location_2 = Path(copy(backup_file, coresys.config.path_core_backup))
|
||||
|
||||
# Test a partial backup
|
||||
job, backup_task = coresys.jobs.schedule_job(
|
||||
coresys.backups.do_backup_partial,
|
||||
JobSchedulerOptions(),
|
||||
"test",
|
||||
folders=["media"],
|
||||
location=location,
|
||||
)
|
||||
backup: Backup = await backup_task
|
||||
await coresys.backups.reload()
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.all_locations == {None: location_1, ".cloud_backup": location_2}
|
||||
|
||||
assert ha_ws_client.async_send_command.call_args_list[-3].args[0] == {
|
||||
"type": "backup/supervisor/backup_complete",
|
||||
"data": {
|
||||
"job_id": job.uuid,
|
||||
"slug": backup.slug,
|
||||
"path": f"/{folder}/{backup.slug}.tar",
|
||||
},
|
||||
}
|
||||
coresys.backups.remove(backup)
|
||||
assert not location_1.exists()
|
||||
assert not location_2.exists()
|
||||
assert not coresys.backups.get("7fed74c8")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("location", "folder"), [(None, "backup"), (".cloud_backup", "cloud_backup")]
|
||||
)
|
||||
@pytest.mark.usefixtures("tmp_supervisor_data")
|
||||
async def test_full_backup_complete_ws_message(
|
||||
coresys: CoreSys, ha_ws_client: AsyncMock, location: str | None, folder: str
|
||||
async def test_backup_remove_one_location_of_multiple(
|
||||
coresys: CoreSys, tmp_supervisor_data: Path
|
||||
):
|
||||
"""Test WS message notifies core when a full backup is complete."""
|
||||
coresys.core.state = CoreState.RUNNING
|
||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||
ha_ws_client.ha_version = AwesomeVersion("2025.12.0")
|
||||
"""Test removing a backup that exists in multiple locations from one location."""
|
||||
backup_file = get_fixture_path("backup_example.tar")
|
||||
location_1 = Path(copy(backup_file, coresys.config.path_backup))
|
||||
location_2 = Path(copy(backup_file, coresys.config.path_core_backup))
|
||||
|
||||
# Test a full backup
|
||||
job, backup_task = coresys.jobs.schedule_job(
|
||||
coresys.backups.do_backup_full, JobSchedulerOptions(), "test", location=location
|
||||
)
|
||||
backup: Backup = await backup_task
|
||||
await coresys.backups.reload()
|
||||
assert (backup := coresys.backups.get("7fed74c8"))
|
||||
assert backup.all_locations == {None: location_1, ".cloud_backup": location_2}
|
||||
|
||||
assert ha_ws_client.async_send_command.call_args_list[-3].args[0] == {
|
||||
"type": "backup/supervisor/backup_complete",
|
||||
"data": {
|
||||
"job_id": job.uuid,
|
||||
"slug": backup.slug,
|
||||
"path": f"/{folder}/{backup.slug}.tar",
|
||||
},
|
||||
}
|
||||
coresys.backups.remove(backup, locations=[".cloud_backup"])
|
||||
assert location_1.exists()
|
||||
assert not location_2.exists()
|
||||
assert coresys.backups.get("7fed74c8")
|
||||
assert backup.all_locations == {None: location_1}
|
||||
|
||||
Reference in New Issue
Block a user