Add sizes per location and support .local (#5581)

This commit is contained in:
Mike Degatano 2025-01-28 05:41:51 -05:00 committed by GitHub
parent 257e2ceb82
commit c8f1b222c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 106 additions and 60 deletions

View File

@ -56,8 +56,8 @@ from ..resolution.const import UnhealthyReason
from .const import ( from .const import (
ATTR_ADDITIONAL_LOCATIONS, ATTR_ADDITIONAL_LOCATIONS,
ATTR_BACKGROUND, ATTR_BACKGROUND,
ATTR_LOCATION_ATTRIBUTES,
ATTR_LOCATIONS, ATTR_LOCATIONS,
ATTR_PROTECTED_LOCATIONS,
ATTR_SIZE_BYTES, ATTR_SIZE_BYTES,
CONTENT_TYPE_TAR, CONTENT_TYPE_TAR,
) )
@ -67,6 +67,8 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
ALL_ADDONS_FLAG = "ALL" ALL_ADDONS_FLAG = "ALL"
LOCATION_LOCAL = ".local"
RE_SLUGIFY_NAME = re.compile(r"[^A-Za-z0-9]+") RE_SLUGIFY_NAME = re.compile(r"[^A-Za-z0-9]+")
RE_BACKUP_FILENAME = re.compile(r"^[^\\\/]+\.tar$") RE_BACKUP_FILENAME = re.compile(r"^[^\\\/]+\.tar$")
@ -82,12 +84,23 @@ def _ensure_list(item: Any) -> list:
return item return item
def _convert_local_location(item: str | None) -> str | None:
"""Convert local location value."""
if item in {LOCATION_LOCAL, ""}:
return None
return item
# pylint: disable=no-value-for-parameter # pylint: disable=no-value-for-parameter
SCHEMA_FOLDERS = vol.All([vol.In(_ALL_FOLDERS)], vol.Unique())
SCHEMA_LOCATION = vol.All(vol.Maybe(str), _convert_local_location)
SCHEMA_LOCATION_LIST = vol.All(_ensure_list, [SCHEMA_LOCATION], vol.Unique())
SCHEMA_RESTORE_FULL = vol.Schema( SCHEMA_RESTORE_FULL = vol.Schema(
{ {
vol.Optional(ATTR_PASSWORD): vol.Maybe(str), vol.Optional(ATTR_PASSWORD): vol.Maybe(str),
vol.Optional(ATTR_BACKGROUND, default=False): vol.Boolean(), vol.Optional(ATTR_BACKGROUND, default=False): vol.Boolean(),
vol.Optional(ATTR_LOCATION): vol.Maybe(str), vol.Optional(ATTR_LOCATION): SCHEMA_LOCATION,
} }
) )
@ -95,7 +108,7 @@ SCHEMA_RESTORE_PARTIAL = SCHEMA_RESTORE_FULL.extend(
{ {
vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(), vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(),
vol.Optional(ATTR_ADDONS): vol.All([str], vol.Unique()), vol.Optional(ATTR_ADDONS): vol.All([str], vol.Unique()),
vol.Optional(ATTR_FOLDERS): vol.All([vol.In(_ALL_FOLDERS)], vol.Unique()), vol.Optional(ATTR_FOLDERS): SCHEMA_FOLDERS,
} }
) )
@ -105,9 +118,7 @@ SCHEMA_BACKUP_FULL = vol.Schema(
vol.Optional(ATTR_FILENAME): vol.Match(RE_BACKUP_FILENAME), vol.Optional(ATTR_FILENAME): vol.Match(RE_BACKUP_FILENAME),
vol.Optional(ATTR_PASSWORD): vol.Maybe(str), vol.Optional(ATTR_PASSWORD): vol.Maybe(str),
vol.Optional(ATTR_COMPRESSED): vol.Maybe(vol.Boolean()), vol.Optional(ATTR_COMPRESSED): vol.Maybe(vol.Boolean()),
vol.Optional(ATTR_LOCATION): vol.All( vol.Optional(ATTR_LOCATION): SCHEMA_LOCATION_LIST,
_ensure_list, [vol.Maybe(str)], vol.Unique()
),
vol.Optional(ATTR_HOMEASSISTANT_EXCLUDE_DATABASE): vol.Boolean(), vol.Optional(ATTR_HOMEASSISTANT_EXCLUDE_DATABASE): vol.Boolean(),
vol.Optional(ATTR_BACKGROUND, default=False): vol.Boolean(), vol.Optional(ATTR_BACKGROUND, default=False): vol.Boolean(),
vol.Optional(ATTR_EXTRA): dict, vol.Optional(ATTR_EXTRA): dict,
@ -119,30 +130,14 @@ SCHEMA_BACKUP_PARTIAL = SCHEMA_BACKUP_FULL.extend(
vol.Optional(ATTR_ADDONS): vol.Or( vol.Optional(ATTR_ADDONS): vol.Or(
ALL_ADDONS_FLAG, vol.All([str], vol.Unique()) ALL_ADDONS_FLAG, vol.All([str], vol.Unique())
), ),
vol.Optional(ATTR_FOLDERS): vol.All([vol.In(_ALL_FOLDERS)], vol.Unique()), vol.Optional(ATTR_FOLDERS): SCHEMA_FOLDERS,
vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(), vol.Optional(ATTR_HOMEASSISTANT): vol.Boolean(),
} }
) )
SCHEMA_OPTIONS = vol.Schema( SCHEMA_OPTIONS = vol.Schema({vol.Optional(ATTR_DAYS_UNTIL_STALE): days_until_stale})
{ SCHEMA_FREEZE = vol.Schema({vol.Optional(ATTR_TIMEOUT): vol.All(int, vol.Range(min=1))})
vol.Optional(ATTR_DAYS_UNTIL_STALE): days_until_stale, SCHEMA_REMOVE = vol.Schema({vol.Optional(ATTR_LOCATION): SCHEMA_LOCATION_LIST})
}
)
SCHEMA_FREEZE = vol.Schema(
{
vol.Optional(ATTR_TIMEOUT): vol.All(int, vol.Range(min=1)),
}
)
SCHEMA_REMOVE = vol.Schema(
{
vol.Optional(ATTR_LOCATION): vol.All(
_ensure_list, [vol.Maybe(str)], vol.Unique()
),
}
)
class APIBackups(CoreSysAttributes): class APIBackups(CoreSysAttributes):
@ -155,6 +150,16 @@ class APIBackups(CoreSysAttributes):
raise APINotFound("Backup does not exist") raise APINotFound("Backup does not exist")
return backup return backup
def _make_location_attributes(self, backup: Backup) -> dict[str, dict[str, Any]]:
"""Make location attributes dictionary."""
return {
loc if loc else LOCATION_LOCAL: {
ATTR_PROTECTED: backup.all_locations[loc][ATTR_PROTECTED],
ATTR_SIZE_BYTES: backup.location_size(loc),
}
for loc in backup.locations
}
def _list_backups(self): def _list_backups(self):
"""Return list of backups.""" """Return list of backups."""
return [ return [
@ -168,11 +173,7 @@ class APIBackups(CoreSysAttributes):
ATTR_LOCATION: backup.location, ATTR_LOCATION: backup.location,
ATTR_LOCATIONS: backup.locations, ATTR_LOCATIONS: backup.locations,
ATTR_PROTECTED: backup.protected, ATTR_PROTECTED: backup.protected,
ATTR_PROTECTED_LOCATIONS: [ ATTR_LOCATION_ATTRIBUTES: self._make_location_attributes(backup),
loc
for loc in backup.locations
if backup.all_locations[loc][ATTR_PROTECTED]
],
ATTR_COMPRESSED: backup.compressed, ATTR_COMPRESSED: backup.compressed,
ATTR_CONTENT: { ATTR_CONTENT: {
ATTR_HOMEASSISTANT: backup.homeassistant_version is not None, ATTR_HOMEASSISTANT: backup.homeassistant_version is not None,
@ -244,11 +245,7 @@ class APIBackups(CoreSysAttributes):
ATTR_SIZE_BYTES: backup.size_bytes, ATTR_SIZE_BYTES: backup.size_bytes,
ATTR_COMPRESSED: backup.compressed, ATTR_COMPRESSED: backup.compressed,
ATTR_PROTECTED: backup.protected, ATTR_PROTECTED: backup.protected,
ATTR_PROTECTED_LOCATIONS: [ ATTR_LOCATION_ATTRIBUTES: self._make_location_attributes(backup),
loc
for loc in backup.locations
if backup.all_locations[loc][ATTR_PROTECTED]
],
ATTR_SUPERVISOR_VERSION: backup.supervisor_version, ATTR_SUPERVISOR_VERSION: backup.supervisor_version,
ATTR_HOMEASSISTANT: backup.homeassistant_version, ATTR_HOMEASSISTANT: backup.homeassistant_version,
ATTR_LOCATION: backup.location, ATTR_LOCATION: backup.location,
@ -467,7 +464,9 @@ class APIBackups(CoreSysAttributes):
"""Download a backup file.""" """Download a backup file."""
backup = self._extract_slug(request) backup = self._extract_slug(request)
# Query will give us '' for /backups, convert value to None # Query will give us '' for /backups, convert value to None
location = request.query.get(ATTR_LOCATION, backup.location) or None location = _convert_local_location(
request.query.get(ATTR_LOCATION, backup.location)
)
self._validate_cloud_backup_location(request, location) self._validate_cloud_backup_location(request, location)
if location not in backup.all_locations: if location not in backup.all_locations:
raise APIError(f"Backup {backup.slug} is not in location {location}") raise APIError(f"Backup {backup.slug} is not in location {location}")
@ -496,7 +495,9 @@ class APIBackups(CoreSysAttributes):
self._validate_cloud_backup_location(request, location_names) self._validate_cloud_backup_location(request, location_names)
# Convert empty string to None if necessary # Convert empty string to None if necessary
locations = [ locations = [
self._location_to_mount(location) if location else None self._location_to_mount(location)
if _convert_local_location(location)
else None
for location in location_names for location in location_names
] ]
location = locations.pop(0) location = locations.pop(0)

View File

@ -47,13 +47,13 @@ ATTR_JOBS = "jobs"
ATTR_LLMNR = "llmnr" ATTR_LLMNR = "llmnr"
ATTR_LLMNR_HOSTNAME = "llmnr_hostname" ATTR_LLMNR_HOSTNAME = "llmnr_hostname"
ATTR_LOCAL_ONLY = "local_only" ATTR_LOCAL_ONLY = "local_only"
ATTR_LOCATION_ATTRIBUTES = "location_attributes"
ATTR_LOCATIONS = "locations" ATTR_LOCATIONS = "locations"
ATTR_MDNS = "mdns" ATTR_MDNS = "mdns"
ATTR_MODEL = "model" ATTR_MODEL = "model"
ATTR_MOUNTS = "mounts" ATTR_MOUNTS = "mounts"
ATTR_MOUNT_POINTS = "mount_points" ATTR_MOUNT_POINTS = "mount_points"
ATTR_PANEL_PATH = "panel_path" ATTR_PANEL_PATH = "panel_path"
ATTR_PROTECTED_LOCATIONS = "protected_locations"
ATTR_REMOVABLE = "removable" ATTR_REMOVABLE = "removable"
ATTR_REMOVE_CONFIG = "remove_config" ATTR_REMOVE_CONFIG = "remove_config"
ATTR_REVISION = "revision" ATTR_REVISION = "revision"

View File

@ -7,7 +7,7 @@ from collections.abc import AsyncGenerator, Awaitable
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from copy import deepcopy from copy import deepcopy
from datetime import timedelta from datetime import timedelta
from functools import cached_property from functools import lru_cache
import io import io
import json import json
import logging import logging
@ -67,6 +67,12 @@ from .validate import SCHEMA_BACKUP
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@lru_cache
def _backup_file_size(backup: Path) -> int:
"""Get backup file size."""
return backup.stat().st_size if backup.is_file() else 0
def location_sort_key(value: str | None) -> str: def location_sort_key(value: str | None) -> str:
"""Sort locations, None is always first else alphabetical.""" """Sort locations, None is always first else alphabetical."""
return value if value else "" return value if value else ""
@ -222,17 +228,15 @@ class Backup(JobGroup):
key=location_sort_key, key=location_sort_key,
) )
@cached_property @property
def size(self) -> float: def size(self) -> float:
"""Return backup size.""" """Return backup size."""
return round(self.size_bytes / 1048576, 2) # calc mbyte return round(self.size_bytes / 1048576, 2) # calc mbyte
@cached_property @property
def size_bytes(self) -> int: def size_bytes(self) -> int:
"""Return backup size in bytes.""" """Return backup size in bytes."""
if not self.tarfile.is_file(): return self.location_size(self.location)
return 0
return self.tarfile.stat().st_size
@property @property
def is_new(self) -> bool: def is_new(self) -> bool:
@ -256,6 +260,14 @@ class Backup(JobGroup):
"""Returns a copy of the data.""" """Returns a copy of the data."""
return deepcopy(self._data) return deepcopy(self._data)
def location_size(self, location: str | None) -> int:
"""Get size of backup in a location."""
if location not in self.all_locations:
return 0
backup = self.all_locations[location][ATTR_PATH]
return _backup_file_size(backup)
def __eq__(self, other: Any) -> bool: def __eq__(self, other: Any) -> bool:
"""Return true if backups have same metadata.""" """Return true if backups have same metadata."""
if not isinstance(other, Backup): if not isinstance(other, Backup):

View File

@ -100,8 +100,13 @@ async def test_options(api_client: TestClient, coresys: CoreSys):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"location,backup_dir", ("location", "backup_dir", "api_location"),
[("backup_test", PurePath("mounts", "backup_test")), (None, PurePath("backup"))], [
("backup_test", PurePath("mounts", "backup_test"), "backup_test"),
(None, PurePath("backup"), None),
("", PurePath("backup"), None),
(".local", PurePath("backup"), None),
],
) )
@pytest.mark.usefixtures("path_extern", "mount_propagation", "mock_is_mount") @pytest.mark.usefixtures("path_extern", "mount_propagation", "mock_is_mount")
async def test_backup_to_location( async def test_backup_to_location(
@ -109,6 +114,7 @@ async def test_backup_to_location(
coresys: CoreSys, coresys: CoreSys,
location: str | None, location: str | None,
backup_dir: PurePath, backup_dir: PurePath,
api_location: str | None,
tmp_supervisor_data: Path, tmp_supervisor_data: Path,
): ):
"""Test making a backup to a specific location with default mount.""" """Test making a backup to a specific location with default mount."""
@ -145,7 +151,7 @@ async def test_backup_to_location(
resp = await api_client.get(f"/backups/{slug}/info") resp = await api_client.get(f"/backups/{slug}/info")
result = await resp.json() result = await resp.json()
assert result["result"] == "ok" assert result["result"] == "ok"
assert result["data"]["location"] == location assert result["data"]["location"] == api_location
@pytest.mark.usefixtures( @pytest.mark.usefixtures(
@ -661,14 +667,18 @@ async def test_backup_with_extras(
@pytest.mark.usefixtures("tmp_supervisor_data") @pytest.mark.usefixtures("tmp_supervisor_data")
async def test_upload_to_multiple_locations(api_client: TestClient, coresys: CoreSys): @pytest.mark.parametrize("local_location", ["", ".local"])
async def test_upload_to_multiple_locations(
api_client: TestClient, coresys: CoreSys, local_location: str
):
"""Test uploading a backup to multiple locations.""" """Test uploading a backup to multiple locations."""
backup_file = get_fixture_path("backup_example.tar") backup_file = get_fixture_path("backup_example.tar")
with backup_file.open("rb") as file, MultipartWriter("form-data") as mp: with backup_file.open("rb") as file, MultipartWriter("form-data") as mp:
mp.append(file) mp.append(file)
resp = await api_client.post( resp = await api_client.post(
"/backups/new/upload?location=&location=.cloud_backup", data=mp f"/backups/new/upload?location={local_location}&location=.cloud_backup",
data=mp,
) )
assert resp.status == 200 assert resp.status == 200
@ -798,8 +808,12 @@ async def test_remove_backup_from_location(api_client: TestClient, coresys: Core
assert backup.all_locations == {None: {"path": location_1, "protected": False}} assert backup.all_locations == {None: {"path": location_1, "protected": False}}
@pytest.mark.parametrize("local_location", ["", ".local"])
async def test_download_backup_from_location( async def test_download_backup_from_location(
api_client: TestClient, coresys: CoreSys, tmp_supervisor_data: Path api_client: TestClient,
coresys: CoreSys,
tmp_supervisor_data: Path,
local_location: str,
): ):
"""Test downloading a backup from a specific location.""" """Test downloading a backup from a specific location."""
backup_file = get_fixture_path("backup_example.tar") backup_file = get_fixture_path("backup_example.tar")
@ -816,12 +830,12 @@ async def test_download_backup_from_location(
# The use case of this is user might want to pick a particular mount if one is flaky # The use case of this is user might want to pick a particular mount if one is flaky
# To simulate this, remove the file from one location and show one works and the other doesn't # To simulate this, remove the file from one location and show one works and the other doesn't
assert backup.location is None assert backup.location is None
location_1.unlink() location_2.unlink()
resp = await api_client.get("/backups/7fed74c8/download?location=")
assert resp.status == 404
resp = await api_client.get("/backups/7fed74c8/download?location=.cloud_backup") resp = await api_client.get("/backups/7fed74c8/download?location=.cloud_backup")
assert resp.status == 404
resp = await api_client.get(f"/backups/7fed74c8/download?location={local_location}")
assert resp.status == 200 assert resp.status == 200
out_file = tmp_supervisor_data / "backup_example.tar" out_file = tmp_supervisor_data / "backup_example.tar"
with out_file.open("wb") as out: with out_file.open("wb") as out:
@ -859,8 +873,12 @@ async def test_partial_backup_all_addons(
store_addons.assert_called_once_with([install_addon_ssh]) store_addons.assert_called_once_with([install_addon_ssh])
@pytest.mark.parametrize("local_location", [None, "", ".local"])
async def test_restore_backup_from_location( async def test_restore_backup_from_location(
api_client: TestClient, coresys: CoreSys, tmp_supervisor_data: Path api_client: TestClient,
coresys: CoreSys,
tmp_supervisor_data: Path,
local_location: str | None,
): ):
"""Test restoring a backup from a specific location.""" """Test restoring a backup from a specific location."""
coresys.core.state = CoreState.RUNNING coresys.core.state = CoreState.RUNNING
@ -889,7 +907,7 @@ async def test_restore_backup_from_location(
resp = await api_client.post( resp = await api_client.post(
f"/backups/{backup.slug}/restore/partial", f"/backups/{backup.slug}/restore/partial",
json={"location": None, "folders": ["share"]}, json={"location": local_location, "folders": ["share"]},
) )
assert resp.status == 400 assert resp.status == 400
body = await resp.json() body = await resp.json()
@ -983,7 +1001,12 @@ async def test_backup_mixed_encryption(api_client: TestClient, coresys: CoreSys)
assert body["data"]["backups"][0]["location"] is None assert body["data"]["backups"][0]["location"] is None
assert body["data"]["backups"][0]["locations"] == [None] assert body["data"]["backups"][0]["locations"] == [None]
assert body["data"]["backups"][0]["protected"] is True assert body["data"]["backups"][0]["protected"] is True
assert body["data"]["backups"][0]["protected_locations"] == [None] assert body["data"]["backups"][0]["location_attributes"] == {
".local": {
"protected": True,
"size_bytes": 10240,
}
}
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -1012,7 +1035,12 @@ async def test_protected_backup(
assert body["data"]["backups"][0]["location"] is None assert body["data"]["backups"][0]["location"] is None
assert body["data"]["backups"][0]["locations"] == [None] assert body["data"]["backups"][0]["locations"] == [None]
assert body["data"]["backups"][0]["protected"] is True assert body["data"]["backups"][0]["protected"] is True
assert body["data"]["backups"][0]["protected_locations"] == [None] assert body["data"]["backups"][0]["location_attributes"] == {
".local": {
"protected": True,
"size_bytes": 10240,
}
}
resp = await api_client.get(f"/backups/{slug}/info") resp = await api_client.get(f"/backups/{slug}/info")
assert resp.status == 200 assert resp.status == 200
@ -1020,4 +1048,9 @@ async def test_protected_backup(
assert body["data"]["location"] is None assert body["data"]["location"] is None
assert body["data"]["locations"] == [None] assert body["data"]["locations"] == [None]
assert body["data"]["protected"] is True assert body["data"]["protected"] is True
assert body["data"]["protected_locations"] == [None] assert body["data"]["location_attributes"] == {
".local": {
"protected": True,
"size_bytes": 10240,
}
}