From ce8b107f1ea0c22c976676e84a761c5870783874 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Mon, 24 Feb 2025 21:34:23 +0100 Subject: [PATCH] Handle OS errors on backup create (#5662) * Handle permission error on backup create Make sure we handle (write) permission errors when creating a backup. * Introduce BackupFileExistError and BackupPermissionError exceptions * Make error messages a bit more uniform * Drop use of exclusive mode SecureTar does not handle exclusive mode nicely. Drop use of it for now. --- supervisor/backups/backup.py | 21 ++++++++++++++++--- supervisor/exceptions.py | 8 ++++++++ tests/backups/test_backup.py | 39 +++++++++++++++++++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/supervisor/backups/backup.py b/supervisor/backups/backup.py index 726327b07..6ded4acfa 100644 --- a/supervisor/backups/backup.py +++ b/supervisor/backups/backup.py @@ -50,8 +50,10 @@ from ..coresys import CoreSys from ..exceptions import ( AddonsError, BackupError, + BackupFileExistError, BackupFileNotFoundError, BackupInvalidError, + BackupPermissionError, ) from ..jobs.const import JOB_GROUP_BACKUP from ..jobs.decorator import Job @@ -457,18 +459,31 @@ class Backup(JobGroup): def _open_outer_tarfile(): """Create and open outer tarfile.""" if self.tarfile.is_file(): - raise BackupError( + raise BackupFileExistError( f"Cannot make new backup at {self.tarfile.as_posix()}, file already exists!", _LOGGER.error, ) - outer_secure_tarfile = SecureTarFile( + _outer_secure_tarfile = SecureTarFile( self.tarfile, "w", gzip=False, bufsize=BUF_SIZE, ) - return outer_secure_tarfile, outer_secure_tarfile.open() + try: + _outer_tarfile = _outer_secure_tarfile.open() + except PermissionError as ex: + raise BackupPermissionError( + f"Cannot open backup file {self.tarfile.as_posix()}, permission error!", + _LOGGER.error, + ) from ex + except OSError as ex: + raise BackupError( + f"Cannot open backup file {self.tarfile.as_posix()} for writing", + _LOGGER.error, + ) from ex + + return _outer_secure_tarfile, _outer_tarfile def _close_outer_tarfile() -> int: """Close outer tarfile.""" diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index f5a851f28..df26a626c 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -663,6 +663,14 @@ class BackupFileNotFoundError(BackupError): """Raise if the backup file hasn't been found.""" +class BackupPermissionError(BackupError): + """Raise if we could not write the backup due to permission error.""" + + +class BackupFileExistError(BackupError): + """Raise if the backup file already exists.""" + + # Security diff --git a/tests/backups/test_backup.py b/tests/backups/test_backup.py index 1340a5e67..13109c48a 100644 --- a/tests/backups/test_backup.py +++ b/tests/backups/test_backup.py @@ -11,7 +11,12 @@ import pytest from supervisor.backups.backup import Backup from supervisor.backups.const import BackupType from supervisor.coresys import CoreSys -from supervisor.exceptions import BackupFileNotFoundError, BackupInvalidError +from supervisor.exceptions import ( + BackupFileExistError, + BackupFileNotFoundError, + BackupInvalidError, + BackupPermissionError, +) from tests.common import get_fixture_path @@ -30,6 +35,38 @@ async def test_new_backup_stays_in_folder(coresys: CoreSys, tmp_path: Path): assert backup.tarfile.exists() +async def test_new_backup_permission_error(coresys: CoreSys, tmp_path: Path): + """Test if a permission error is correctly handled when a new backup is created.""" + backup = Backup(coresys, tmp_path / "my_backup.tar", "test", None) + backup.new("test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL) + assert not listdir(tmp_path) + + with ( + patch( + "tarfile.open", + MagicMock(side_effect=PermissionError), + ), + pytest.raises(BackupPermissionError), + ): + async with backup.create(): + assert len(listdir(tmp_path)) == 1 + assert backup.tarfile.exists() + + +async def test_new_backup_exists_error(coresys: CoreSys, tmp_path: Path): + """Test if a permission error is correctly handled when a new backup is created.""" + backup_file = tmp_path / "my_backup.tar" + backup = Backup(coresys, backup_file, "test", None) + backup.new("test", "2023-07-21T21:05:00.000000+00:00", BackupType.FULL) + backup_file.touch() + + with ( + pytest.raises(BackupFileExistError), + ): + async with backup.create(): + pass + + async def test_consolidate_conflict_varied_encryption( coresys: CoreSys, tmp_path: Path, caplog: pytest.LogCaptureFixture ):