Handle OS errors on backup create (#5662)

* Handle permission error on backup create

Make sure we handle (write) permission errors when creating a backup.

* Introduce BackupFileExistError and BackupPermissionError exceptions

* Make error messages a bit more uniform

* Drop use of exclusive mode

SecureTar does not handle exclusive mode nicely. Drop use of it for now.
This commit is contained in:
Stefan Agner 2025-02-24 21:34:23 +01:00 committed by GitHub
parent 32936e5de0
commit ce8b107f1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 4 deletions

View File

@ -50,8 +50,10 @@ from ..coresys import CoreSys
from ..exceptions import ( from ..exceptions import (
AddonsError, AddonsError,
BackupError, BackupError,
BackupFileExistError,
BackupFileNotFoundError, BackupFileNotFoundError,
BackupInvalidError, BackupInvalidError,
BackupPermissionError,
) )
from ..jobs.const import JOB_GROUP_BACKUP from ..jobs.const import JOB_GROUP_BACKUP
from ..jobs.decorator import Job from ..jobs.decorator import Job
@ -457,18 +459,31 @@ class Backup(JobGroup):
def _open_outer_tarfile(): def _open_outer_tarfile():
"""Create and open outer tarfile.""" """Create and open outer tarfile."""
if self.tarfile.is_file(): if self.tarfile.is_file():
raise BackupError( raise BackupFileExistError(
f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!", f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!",
_LOGGER.error, _LOGGER.error,
) )
outer_secure_tarfile = SecureTarFile( _outer_secure_tarfile = SecureTarFile(
self.tarfile, self.tarfile,
"w", "w",
gzip=False, gzip=False,
bufsize=BUF_SIZE, bufsize=BUF_SIZE,
) )
return outer_secure_tarfile, outer_secure_tarfile.open() try:
_outer_tarfile = _outer_secure_tarfile.open()
except PermissionError as ex:
raise BackupPermissionError(
f"Cannot open backup file {self.tarfile.as_posix()}, permission error!",
_LOGGER.error,
) from ex
except OSError as ex:
raise BackupError(
f"Cannot open backup file {self.tarfile.as_posix()} for writing",
_LOGGER.error,
) from ex
return _outer_secure_tarfile, _outer_tarfile
def _close_outer_tarfile() -> int: def _close_outer_tarfile() -> int:
"""Close outer tarfile.""" """Close outer tarfile."""

View File

@ -663,6 +663,14 @@ class BackupFileNotFoundError(BackupError):
"""Raise if the backup file hasn't been found.""" """Raise if the backup file hasn't been found."""
class BackupPermissionError(BackupError):
"""Raise if we could not write the backup due to permission error."""
class BackupFileExistError(BackupError):
"""Raise if the backup file already exists."""
# Security # Security

View File

@ -11,7 +11,12 @@ import pytest
from supervisor.backups.backup import Backup from supervisor.backups.backup import Backup
from supervisor.backups.const import BackupType from supervisor.backups.const import BackupType
from supervisor.coresys import CoreSys from supervisor.coresys import CoreSys
from supervisor.exceptions import BackupFileNotFoundError, BackupInvalidError from supervisor.exceptions import (
BackupFileExistError,
BackupFileNotFoundError,
BackupInvalidError,
BackupPermissionError,
)
from tests.common import get_fixture_path from tests.common import get_fixture_path
@ -30,6 +35,38 @@ async def test_new_backup_stays_in_folder(coresys: CoreSys, tmp_path: Path):
assert backup.tarfile.exists() assert backup.tarfile.exists()
async def test_new_backup_permission_error(coresys: CoreSys, tmp_path: Path):
"""Test if a permission error is correctly handled when a new backup is created."""
backup = Backup(coresys, tmp_path / "my_backup.tar", "test", None)
backup.new("test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL)
assert not listdir(tmp_path)
with (
patch(
"tarfile.open",
MagicMock(side_effect=PermissionError),
),
pytest.raises(BackupPermissionError),
):
async with backup.create():
assert len(listdir(tmp_path)) == 1
assert backup.tarfile.exists()
async def test_new_backup_exists_error(coresys: CoreSys, tmp_path: Path):
"""Test if a permission error is correctly handled when a new backup is created."""
backup_file = tmp_path / "my_backup.tar"
backup = Backup(coresys, backup_file, "test", None)
backup.new("test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL)
backup_file.touch()
with (
pytest.raises(BackupFileExistError),
):
async with backup.create():
pass
async def test_consolidate_conflict_varied_encryption( async def test_consolidate_conflict_varied_encryption(
coresys: CoreSys, tmp_path: Path, caplog: pytest.LogCaptureFixture coresys: CoreSys, tmp_path: Path, caplog: pytest.LogCaptureFixture
): ):