mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-08 17:56:33 +00:00
Improve backup upload location determination (#5848)
* Improve backup upload location determination For local backup upload locations, check if the location is on the same file system an thuse allows to move the backup file after upload. This allows custom backup mounts. Currently there is no documented, persistent way to create such mounts in with Home Assistant OS installations, but since we might add local mounts in the future this seems a worthwhile addition. Fixes: #5837 * Fix pytests
This commit is contained in:
parent
8e714072c2
commit
c1b45406d6
@ -53,7 +53,6 @@ from ..coresys import CoreSysAttributes
|
||||
from ..exceptions import APIError, APIForbidden, APINotFound
|
||||
from ..jobs import JobSchedulerOptions, SupervisorJob
|
||||
from ..mounts.const import MountUsage
|
||||
from ..mounts.mount import Mount
|
||||
from ..resolution.const import UnhealthyReason
|
||||
from .const import (
|
||||
ATTR_ADDITIONAL_LOCATIONS,
|
||||
@ -495,7 +494,7 @@ class APIBackups(CoreSysAttributes):
|
||||
"""Upload a backup file."""
|
||||
location: LOCATION_TYPE = None
|
||||
locations: list[LOCATION_TYPE] | None = None
|
||||
tmp_path = self.sys_config.path_tmp
|
||||
|
||||
if ATTR_LOCATION in request.query:
|
||||
location_names: list[str] = request.query.getall(ATTR_LOCATION, [])
|
||||
self._validate_cloud_backup_location(
|
||||
@ -510,9 +509,6 @@ class APIBackups(CoreSysAttributes):
|
||||
]
|
||||
location = locations.pop(0)
|
||||
|
||||
if location and location != LOCATION_CLOUD_BACKUP:
|
||||
tmp_path = cast(Mount, location).local_where
|
||||
|
||||
filename: str | None = None
|
||||
if ATTR_FILENAME in request.query:
|
||||
filename = request.query.get(ATTR_FILENAME)
|
||||
@ -521,13 +517,14 @@ class APIBackups(CoreSysAttributes):
|
||||
except vol.Invalid as ex:
|
||||
raise APIError(humanize_error(filename, ex)) from None
|
||||
|
||||
tmp_path = await self.sys_backups.get_upload_path_for_location(location)
|
||||
temp_dir: TemporaryDirectory | None = None
|
||||
backup_file_stream: IOBase | None = None
|
||||
|
||||
def open_backup_file() -> Path:
|
||||
nonlocal temp_dir, backup_file_stream
|
||||
temp_dir = TemporaryDirectory(dir=tmp_path.as_posix())
|
||||
tar_file = Path(temp_dir.name, "backup.tar")
|
||||
tar_file = Path(temp_dir.name, "upload.tar")
|
||||
backup_file_stream = tar_file.open("wb")
|
||||
return tar_file
|
||||
|
||||
|
@ -122,6 +122,25 @@ class BackupManager(FileConfiguration, JobGroup):
|
||||
|
||||
return self.sys_config.path_backup
|
||||
|
||||
async def get_upload_path_for_location(self, location: LOCATION_TYPE) -> Path:
|
||||
"""Get a path (temporary) upload path for a backup location."""
|
||||
target_path = self._get_base_path(location)
|
||||
|
||||
# Return target path for mounts since tmp will always be local, mounts
|
||||
# will never be the same device.
|
||||
if location is not None and location != LOCATION_CLOUD_BACKUP:
|
||||
return target_path
|
||||
|
||||
tmp_path = self.sys_config.path_tmp
|
||||
|
||||
def check_same_mount() -> bool:
|
||||
"""Check if the target path is on the same mount as the backup location."""
|
||||
return target_path.stat().st_dev == tmp_path.stat().st_dev
|
||||
|
||||
if await self.sys_run_in_executor(check_same_mount):
|
||||
return tmp_path
|
||||
return target_path
|
||||
|
||||
async def _check_location(self, location: LOCATION_TYPE | type[DEFAULT] = DEFAULT):
|
||||
"""Check if backup location is accessible."""
|
||||
if location == DEFAULT and self.sys_mounts.default_backup_mount:
|
||||
|
@ -5,6 +5,7 @@ import errno
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from shutil import copy, rmtree
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, PropertyMock, patch
|
||||
|
||||
from awesomeversion import AwesomeVersion
|
||||
@ -2122,3 +2123,63 @@ async def test_backup_multiple_locations_oserror(
|
||||
assert (
|
||||
UnhealthyReason.OSERROR_BAD_MESSAGE in coresys.resolution.unhealthy
|
||||
) is unhealthy
|
||||
|
||||
|
||||
@pytest.mark.parametrize("same_mount", [True, False])
|
||||
async def test_get_upload_path_for_backup_location(
|
||||
coresys: CoreSys,
|
||||
same_mount,
|
||||
):
|
||||
"""Test get_upload_path_for_location with local backup location."""
|
||||
manager = BackupManager(coresys)
|
||||
|
||||
target_path = coresys.config.path_backup
|
||||
tmp_path = coresys.config.path_tmp
|
||||
|
||||
def make_stat_mock(target_path: Path, tmp_path: Path, same_mount: bool):
|
||||
def _mock_stat(self):
|
||||
if self == target_path:
|
||||
return SimpleNamespace(st_dev=1)
|
||||
if self == tmp_path:
|
||||
return SimpleNamespace(st_dev=1 if same_mount else 2)
|
||||
raise ValueError(f"Unexpected path: {self}")
|
||||
|
||||
return _mock_stat
|
||||
|
||||
with patch(
|
||||
"pathlib.Path.stat", new=make_stat_mock(target_path, tmp_path, same_mount)
|
||||
):
|
||||
result = await manager.get_upload_path_for_location(None)
|
||||
|
||||
if same_mount:
|
||||
assert result == tmp_path
|
||||
else:
|
||||
assert result == target_path
|
||||
|
||||
|
||||
async def test_get_upload_path_for_mount_location(
|
||||
coresys: CoreSys,
|
||||
tmp_supervisor_data,
|
||||
path_extern,
|
||||
mount_propagation,
|
||||
mock_is_mount,
|
||||
):
|
||||
"""Test get_upload_path_for_location with a Mount location."""
|
||||
manager = BackupManager(coresys)
|
||||
|
||||
await coresys.mounts.load()
|
||||
mount = Mount.from_dict(
|
||||
coresys,
|
||||
{
|
||||
"name": "test_mount",
|
||||
"usage": "backup",
|
||||
"type": "cifs",
|
||||
"server": "server.local",
|
||||
"share": "test",
|
||||
},
|
||||
)
|
||||
await coresys.mounts.create_mount(mount)
|
||||
|
||||
result = await manager.get_upload_path_for_location(mount)
|
||||
|
||||
assert result == mount.local_where
|
||||
|
Loading…
x
Reference in New Issue
Block a user