From c2f6e319f202bf76a1ec1a3d8bc3a6d97b4df56c Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 31 Dec 2024 13:58:12 +0100 Subject: [PATCH] Check password early on backup restore (#5519) Introduce a validate password method which only peaks into the archive to validate the password before starting the actual restore process. This makes sure that a wrong password returns an error even when restoring the backup in background. --- supervisor/backups/backup.py | 53 +++++++++++++++++++++++++++++++---- supervisor/backups/manager.py | 20 +++++++------ tests/api/test_backups.py | 2 +- tests/backups/test_manager.py | 11 ++++++-- 4 files changed, 70 insertions(+), 16 deletions(-) diff --git a/supervisor/backups/backup.py b/supervisor/backups/backup.py index 3f32b0259..70736c0c9 100644 --- a/supervisor/backups/backup.py +++ b/supervisor/backups/backup.py @@ -296,12 +296,13 @@ class Backup(JobGroup): if not compressed: self._data[ATTR_COMPRESSED] = False - def set_password(self, password: str) -> bool: + def set_password(self, password: str | None) -> None: """Set the password for an existing backup.""" - if not password: - return False - self._init_password(password) - return True + if password: + self._init_password(password) + else: + self._key = None + self._aes = None def _init_password(self, password: str) -> None: """Set password + init aes cipher.""" @@ -334,6 +335,48 @@ class Backup(JobGroup): data = padder.update(decrypt.update(b64decode(data))) + padder.finalize() return data.decode() + async def validate_password(self) -> bool: + """Validate backup password. + + Returns false only when the password is known to be wrong. + """ + + def _validate_file() -> bool: + ending = f".tar{'.gz' if self.compressed else ''}" + + with tarfile.open(self.tarfile, "r:") as backup: + test_tar_name = next( + ( + entry.name + for entry in backup.getmembers() + if entry.name.endswith(ending) + ), + None, + ) + if not test_tar_name: + _LOGGER.warning("No tar file found to validate password with") + return True + + test_tar_file = backup.extractfile(test_tar_name) + try: + with SecureTarFile( + ending, # Not used + gzip=self.compressed, + key=self._key, + mode="r", + fileobj=test_tar_file, + ): + # If we can read the tar file, the password is correct + return True + except tarfile.ReadError: + _LOGGER.debug("Invalid password") + return False + except Exception: # pylint: disable=broad-exception-caught + _LOGGER.exception("Unexpected error validating password") + return True + + return await self.sys_run_in_executor(_validate_file) + async def load(self): """Read backup.json from tar file.""" if not self.tarfile.is_file(): diff --git a/supervisor/backups/manager.py b/supervisor/backups/manager.py index e4e899780..7a655af53 100644 --- a/supervisor/backups/manager.py +++ b/supervisor/backups/manager.py @@ -687,10 +687,12 @@ class BackupManager(FileConfiguration, JobGroup): f"{backup.slug} is only a partial backup!", _LOGGER.error ) - if backup.protected and not backup.set_password(password): - raise BackupInvalidError( - f"Invalid password for backup {backup.slug}", _LOGGER.error - ) + if backup.protected: + backup.set_password(password) + if not await backup.validate_password(): + raise BackupInvalidError( + f"Invalid password for backup {backup.slug}", _LOGGER.error + ) if backup.supervisor_version > self.sys_supervisor.version: raise BackupInvalidError( @@ -755,10 +757,12 @@ class BackupManager(FileConfiguration, JobGroup): folder_list.remove(FOLDER_HOMEASSISTANT) homeassistant = True - if backup.protected and not backup.set_password(password): - raise BackupInvalidError( - f"Invalid password for backup {backup.slug}", _LOGGER.error - ) + if backup.protected: + backup.set_password(password) + if not await backup.validate_password(): + raise BackupInvalidError( + f"Invalid password for backup {backup.slug}", _LOGGER.error + ) if backup.homeassistant is None and homeassistant: raise BackupInvalidError( diff --git a/tests/api/test_backups.py b/tests/api/test_backups.py index cb607f594..7b8710a90 100644 --- a/tests/api/test_backups.py +++ b/tests/api/test_backups.py @@ -478,7 +478,7 @@ async def test_restore_immediate_errors( with ( patch.object(Backup, "protected", new=PropertyMock(return_value=True)), - patch.object(Backup, "set_password", return_value=False), + patch.object(Backup, "validate_password", return_value=False), ): resp = await api_client.post( f"/backups/{mock_partial_backup.slug}/restore/partial", diff --git a/tests/backups/test_manager.py b/tests/backups/test_manager.py index 1f267228b..bbf4f4720 100644 --- a/tests/backups/test_manager.py +++ b/tests/backups/test_manager.py @@ -209,6 +209,7 @@ async def test_do_restore_full(coresys: CoreSys, full_backup_mock, install_addon manager = BackupManager(coresys) backup_instance = full_backup_mock.return_value + backup_instance.protected = False backup_instance.sys_addons = coresys.addons backup_instance.remove_delta_addons = partial( Backup.remove_delta_addons, backup_instance @@ -241,6 +242,7 @@ async def test_do_restore_full_different_addon( manager = BackupManager(coresys) backup_instance = full_backup_mock.return_value + backup_instance.protected = False backup_instance.addon_list = ["differentslug"] backup_instance.sys_addons = coresys.addons backup_instance.remove_delta_addons = partial( @@ -273,6 +275,7 @@ async def test_do_restore_partial_minimal( manager = BackupManager(coresys) backup_instance = partial_backup_mock.return_value + backup_instance.protected = False assert await manager.do_restore_partial(backup_instance, homeassistant=False) backup_instance.restore_homeassistant.assert_not_called() @@ -297,6 +300,7 @@ async def test_do_restore_partial_maximal(coresys: CoreSys, partial_backup_mock) manager = BackupManager(coresys) backup_instance = partial_backup_mock.return_value + backup_instance.protected = False assert await manager.do_restore_partial( backup_instance, addons=[TEST_ADDON_SLUG], @@ -330,7 +334,7 @@ async def test_fail_invalid_full_backup( backup_instance = full_backup_mock.return_value backup_instance.protected = True - backup_instance.set_password.return_value = False + backup_instance.validate_password = AsyncMock(return_value=False) with pytest.raises(BackupInvalidError): await manager.do_restore_full(backup_instance) @@ -359,7 +363,7 @@ async def test_fail_invalid_partial_backup( backup_instance = partial_backup_mock.return_value backup_instance.protected = True - backup_instance.set_password.return_value = False + backup_instance.validate_password = AsyncMock(return_value=False) with pytest.raises(BackupInvalidError): await manager.do_restore_partial(backup_instance) @@ -407,6 +411,7 @@ async def test_restore_error( coresys.homeassistant.core.start = AsyncMock(return_value=None) backup_instance = full_backup_mock.return_value + backup_instance.protected = False backup_instance.restore_dockerconfig.side_effect = BackupError() with pytest.raises(BackupError): await coresys.backups.do_restore_full(backup_instance) @@ -1818,6 +1823,7 @@ async def test_monitoring_after_full_restore( manager = BackupManager(coresys) backup_instance = full_backup_mock.return_value + backup_instance.protected = False assert await manager.do_restore_full(backup_instance) backup_instance.restore_addons.assert_called_once_with([TEST_ADDON_SLUG]) @@ -1835,6 +1841,7 @@ async def test_monitoring_after_partial_restore( manager = BackupManager(coresys) backup_instance = partial_backup_mock.return_value + backup_instance.protected = False assert await manager.do_restore_partial(backup_instance, addons=[TEST_ADDON_SLUG]) backup_instance.restore_addons.assert_called_once_with([TEST_ADDON_SLUG])