Handle non-existing file in Backup password check too (#5599)

* Handle non-existing file in Backup password check too

Make sure we handle a non-existing backup file also when validating
the password.

* Update supervisor/backups/manager.py

Co-authored-by: Mike Degatano <michael.degatano@gmail.com>

* Add test case and fix password check when multiple locations

* Mock default backup unprotected by default

Instead of setting the protected property which we might not use
everywhere, simply mock the default backup to be unprotected.

* Fix mock of protected backup

* Introduce test for validate_password

Testing showed that validate_password doesn't return anything. Extend
tests to cover this case and fix the actual code.

---------

Co-authored-by: Mike Degatano <michael.degatano@gmail.com>
This commit is contained in:
Stefan Agner 2025-02-04 11:23:05 +01:00 committed by GitHub
parent 4c04f364a3
commit 58df65541c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 125 additions and 30 deletions

View File

@ -393,16 +393,17 @@ class Backup(JobGroup):
data = padder.update(decrypt.update(b64decode(data))) + padder.finalize()
return data.decode()
async def validate_password(self) -> bool:
async def validate_password(self, location: str | None) -> bool:
"""Validate backup password.
Returns false only when the password is known to be wrong.
"""
backup_file: Path = self.all_locations[location][ATTR_PATH]
def _validate_file() -> bool:
ending = f".tar{'.gz' if self.compressed else ''}"
with tarfile.open(self.tarfile, "r:") as backup:
with tarfile.open(backup_file, "r:") as backup:
test_tar_name = next(
(
entry.name
@ -433,7 +434,14 @@ class Backup(JobGroup):
_LOGGER.exception("Unexpected error validating password")
return True
return await self.sys_run_in_executor(_validate_file)
try:
return await self.sys_run_in_executor(_validate_file)
except FileNotFoundError as err:
self.sys_create_task(self.sys_backups.reload(location))
raise BackupFileNotFoundError(
f"Cannot validate backup at {backup_file.as_posix()}, file does not exist!",
_LOGGER.error,
) from err
async def load(self):
"""Read backup.json from tar file."""

View File

@ -720,14 +720,10 @@ class BackupManager(FileConfiguration, JobGroup):
f"Backup {backup.slug} does not exist in {location}", _LOGGER.error
)
if (
location == DEFAULT
and backup.protected
or location != DEFAULT
and backup.all_locations[location][ATTR_PROTECTED]
):
location = location if location != DEFAULT else backup.location
if backup.all_locations[location][ATTR_PROTECTED]:
backup.set_password(password)
if not await backup.validate_password():
if not await backup.validate_password(location):
raise BackupInvalidError(
f"Invalid password for backup {backup.slug}", _LOGGER.error
)

View File

@ -485,7 +485,9 @@ async def test_restore_immediate_errors(
assert "Must update supervisor" in (await resp.json())["message"]
with (
patch.object(Backup, "protected", new=PropertyMock(return_value=True)),
patch.object(
Backup, "all_locations", new={None: {"path": None, "protected": True}}
),
patch.object(Backup, "validate_password", return_value=False),
):
resp = await api_client.post(
@ -1120,15 +1122,32 @@ async def test_upload_to_mount(api_client: TestClient, coresys: CoreSys):
@pytest.mark.parametrize(
("method", "url_path", "body"),
("method", "url_path", "body", "backup_file"),
[
("delete", "/backups/7fed74c8", {"location": ".cloud_backup"}),
(
"delete",
"/backups/7fed74c8",
{"location": ".cloud_backup"},
"backup_example.tar",
),
(
"post",
"/backups/7fed74c8/restore/partial",
{"location": ".cloud_backup", "folders": ["ssl"]},
"backup_example.tar",
),
(
"get",
"/backups/7fed74c8/download?location=.cloud_backup",
None,
"backup_example.tar",
),
(
"post",
"/backups/93b462f8/restore/partial",
{"location": ".cloud_backup", "folders": ["ssl"], "password": "bad"},
"backup_example_enc.tar",
),
("get", "/backups/7fed74c8/download?location=.cloud_backup", None),
],
)
@pytest.mark.usefixtures("tmp_supervisor_data")
@ -1138,19 +1157,22 @@ async def test_missing_file_removes_location_from_cache(
method: str,
url_path: str,
body: dict[str, Any] | None,
backup_file: str,
):
"""Test finding a missing file removes the location from cache."""
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path("backup_example.tar")
backup_file = get_fixture_path(backup_file)
copy(backup_file, coresys.config.path_backup)
bad_location = Path(copy(backup_file, coresys.config.path_core_backup))
await coresys.backups.reload()
assert len(coresys.backups.list_backups) == 1
slug = list(coresys.backups.list_backups)[0].slug
# After reload, remove one of the file and confirm we have an out of date cache
bad_location.unlink()
assert coresys.backups.get("7fed74c8").all_locations.keys() == {
assert coresys.backups.get(slug).all_locations.keys() == {
None,
".cloud_backup",
}
@ -1160,19 +1182,31 @@ async def test_missing_file_removes_location_from_cache(
# Wait for reload task to complete and confirm location is removed
await asyncio.sleep(0)
assert coresys.backups.get("7fed74c8").all_locations.keys() == {None}
assert coresys.backups.get(slug).all_locations.keys() == {None}
@pytest.mark.parametrize(
("method", "url_path", "body"),
("method", "url_path", "body", "backup_file"),
[
("delete", "/backups/7fed74c8", {"location": ".local"}),
("delete", "/backups/7fed74c8", {"location": ".local"}, "backup_example.tar"),
(
"post",
"/backups/7fed74c8/restore/partial",
{"location": ".local", "folders": ["ssl"]},
"backup_example.tar",
),
(
"get",
"/backups/7fed74c8/download?location=.local",
None,
"backup_example.tar",
),
(
"post",
"/backups/93b462f8/restore/partial",
{"location": ".local", "folders": ["ssl"], "password": "bad"},
"backup_example_enc.tar",
),
("get", "/backups/7fed74c8/download?location=.local", None),
],
)
@pytest.mark.usefixtures("tmp_supervisor_data")
@ -1182,18 +1216,21 @@ async def test_missing_file_removes_backup_from_cache(
method: str,
url_path: str,
body: dict[str, Any] | None,
backup_file: str,
):
"""Test finding a missing file removes the backup from cache if its the only one."""
coresys.core.state = CoreState.RUNNING
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
backup_file = get_fixture_path("backup_example.tar")
backup_file = get_fixture_path(backup_file)
bad_location = Path(copy(backup_file, coresys.config.path_backup))
await coresys.backups.reload()
assert len(coresys.backups.list_backups) == 1
slug = list(coresys.backups.list_backups)[0].slug
# After reload, remove one of the file and confirm we have an out of date cache
bad_location.unlink()
assert coresys.backups.get("7fed74c8").all_locations.keys() == {None}
assert coresys.backups.get(slug).all_locations.keys() == {None}
resp = await api_client.request(method, url_path, json=body)
assert resp.status == 404

View File

@ -40,6 +40,8 @@ def partial_backup_mock(backup_mock):
backup_instance.folders = []
backup_instance.addon_list = [TEST_ADDON_SLUG]
backup_instance.supervisor_version = "9999.09.9.dev9999"
backup_instance.location = None
backup_instance.all_locations = {None: {"protected": False}}
yield backup_mock
@ -51,6 +53,8 @@ def full_backup_mock(backup_mock):
backup_instance.folders = ALL_FOLDERS
backup_instance.addon_list = [TEST_ADDON_SLUG]
backup_instance.supervisor_version = "9999.09.9.dev9999"
backup_instance.location = None
backup_instance.all_locations = {None: {"protected": False}}
yield backup_mock

View File

@ -3,12 +3,15 @@
from os import listdir
from pathlib import Path
from shutil import copy
import tarfile
from unittest.mock import MagicMock, patch
import pytest
from supervisor.backups.backup import Backup
from supervisor.backups.const import BackupType
from supervisor.coresys import CoreSys
from supervisor.exceptions import BackupFileNotFoundError
from tests.common import get_fixture_path
@ -72,3 +75,54 @@ async def test_consolidate(
None: {"path": enc_tar, "protected": True},
"backup_test": {"path": unc_tar, "protected": False},
}
@pytest.mark.asyncio
@pytest.mark.parametrize(
"tarfile_side_effect, securetar_side_effect, expected_result, expect_exception",
[
(None, None, True, False), # Successful validation
(FileNotFoundError, None, None, True), # File not found
(None, tarfile.ReadError, False, False), # Invalid password
],
)
async def test_validate_password(
coresys: CoreSys,
tmp_path: Path,
tarfile_side_effect,
securetar_side_effect,
expected_result,
expect_exception,
):
"""Parameterized test for validate_password."""
enc_tar = Path(copy(get_fixture_path("backup_example_enc.tar"), tmp_path))
enc_backup = Backup(coresys, enc_tar, "test", None)
await enc_backup.load()
backup_tar_mock = MagicMock()
backup_tar_mock.getmembers.return_value = [
MagicMock(name="test.tar.gz")
] # Fake tar entries
backup_tar_mock.extractfile.return_value = MagicMock()
backup_context_mock = MagicMock()
backup_context_mock.__enter__.return_value = backup_tar_mock
backup_context_mock.__exit__.return_value = False
with (
patch(
"tarfile.open",
MagicMock(
return_value=backup_context_mock, side_effect=tarfile_side_effect
),
),
patch(
"supervisor.backups.backup.SecureTarFile",
MagicMock(side_effect=securetar_side_effect),
),
):
if expect_exception:
with pytest.raises(BackupFileNotFoundError):
await enc_backup.validate_password(None)
else:
result = await enc_backup.validate_password(None)
assert result == expected_result

View File

@ -232,7 +232,6 @@ async def test_do_restore_full(coresys: CoreSys, full_backup_mock, install_addon
manager = BackupManager(coresys)
backup_instance = full_backup_mock.return_value
backup_instance.protected = False
backup_instance.sys_addons = coresys.addons
backup_instance.remove_delta_addons = partial(
Backup.remove_delta_addons, backup_instance
@ -265,7 +264,6 @@ async def test_do_restore_full_different_addon(
manager = BackupManager(coresys)
backup_instance = full_backup_mock.return_value
backup_instance.protected = False
backup_instance.addon_list = ["differentslug"]
backup_instance.sys_addons = coresys.addons
backup_instance.remove_delta_addons = partial(
@ -298,7 +296,6 @@ async def test_do_restore_partial_minimal(
manager = BackupManager(coresys)
backup_instance = partial_backup_mock.return_value
backup_instance.protected = False
assert await manager.do_restore_partial(backup_instance, homeassistant=False)
backup_instance.restore_homeassistant.assert_not_called()
@ -323,7 +320,6 @@ async def test_do_restore_partial_maximal(coresys: CoreSys, partial_backup_mock)
manager = BackupManager(coresys)
backup_instance = partial_backup_mock.return_value
backup_instance.protected = False
assert await manager.do_restore_partial(
backup_instance,
addons=[TEST_ADDON_SLUG],
@ -356,13 +352,13 @@ async def test_fail_invalid_full_backup(
await manager.do_restore_full(partial_backup_mock.return_value)
backup_instance = full_backup_mock.return_value
backup_instance.protected = True
backup_instance.all_locations[None]["protected"] = True
backup_instance.validate_password = AsyncMock(return_value=False)
with pytest.raises(BackupInvalidError):
await manager.do_restore_full(backup_instance)
backup_instance.protected = False
backup_instance.all_locations[None]["protected"] = False
backup_instance.supervisor_version = "2022.08.4"
with (
patch.object(
@ -385,13 +381,13 @@ async def test_fail_invalid_partial_backup(
manager = BackupManager(coresys)
backup_instance = partial_backup_mock.return_value
backup_instance.protected = True
backup_instance.all_locations[None]["protected"] = True
backup_instance.validate_password = AsyncMock(return_value=False)
with pytest.raises(BackupInvalidError):
await manager.do_restore_partial(backup_instance)
backup_instance.protected = False
backup_instance.all_locations[None]["protected"] = False
backup_instance.homeassistant = None
with pytest.raises(BackupInvalidError):

BIN
tests/fixtures/backup_example_enc.tar vendored Normal file

Binary file not shown.