diff --git a/homeassistant/components/backup/manager.py b/homeassistant/components/backup/manager.py index 4a0b8553f1c..a27c1cc7170 100644 --- a/homeassistant/components/backup/manager.py +++ b/homeassistant/components/backup/manager.py @@ -48,7 +48,11 @@ from .const import ( ) from .models import AgentBackup, Folder from .store import BackupStore -from .util import make_backup_dir, read_backup +from .util import make_backup_dir, read_backup, validate_password + + +class IncorrectPasswordError(HomeAssistantError): + """Raised when the password is incorrect.""" @dataclass(frozen=True, kw_only=True, slots=True) @@ -1269,6 +1273,12 @@ class CoreBackupReaderWriter(BackupReaderWriter): remove_after_restore = True + password_valid = await self._hass.async_add_executor_job( + validate_password, path, password + ) + if not password_valid: + raise IncorrectPasswordError("The password provided is incorrect.") + def _write_restore_file() -> None: """Write the restore file.""" Path(self._hass.config.path(RESTORE_BACKUP_FILE)).write_text( diff --git a/homeassistant/components/backup/util.py b/homeassistant/components/backup/util.py index 0cedc07443a..930625c52ca 100644 --- a/homeassistant/components/backup/util.py +++ b/homeassistant/components/backup/util.py @@ -9,11 +9,13 @@ import tarfile from typing import cast import aiohttp +from securetar import SecureTarFile +from homeassistant.backup_restore import password_to_key from homeassistant.core import HomeAssistant from homeassistant.util.json import JsonObjectType, json_loads_object -from .const import BUF_SIZE +from .const import BUF_SIZE, LOGGER from .models import AddonInfo, AgentBackup, Folder @@ -71,6 +73,39 @@ def read_backup(backup_path: Path) -> AgentBackup: ) +def validate_password(path: Path, password: str | None) -> bool: + """Validate the password.""" + with tarfile.open(path, "r:", bufsize=BUF_SIZE) as backup_file: + compressed = False + ha_tar_name = "homeassistant.tar" + try: + ha_tar = backup_file.extractfile(ha_tar_name) + except KeyError: + compressed = True + ha_tar_name = "homeassistant.tar.gz" + try: + ha_tar = backup_file.extractfile(ha_tar_name) + except KeyError: + LOGGER.error("No homeassistant.tar or homeassistant.tar.gz found") + return False + try: + with SecureTarFile( + path, # Not used + gzip=compressed, + key=password_to_key(password) if password is not None else None, + mode="r", + fileobj=ha_tar, + ): + # If we can read the tar file, the password is correct + return True + except tarfile.ReadError: + LOGGER.debug("Invalid password") + return False + except Exception: # noqa: BLE001 + LOGGER.exception("Unexpected error validating password") + return False + + async def receive_file( hass: HomeAssistant, contents: aiohttp.BodyPartReader, path: Path ) -> None: diff --git a/homeassistant/components/backup/websocket.py b/homeassistant/components/backup/websocket.py index 718ffc3ae44..0139b7fdb77 100644 --- a/homeassistant/components/backup/websocket.py +++ b/homeassistant/components/backup/websocket.py @@ -9,7 +9,7 @@ from homeassistant.core import HomeAssistant, callback from .config import ScheduleState from .const import DATA_MANAGER, LOGGER -from .manager import ManagerStateEvent +from .manager import IncorrectPasswordError, ManagerStateEvent from .models import Folder @@ -131,16 +131,20 @@ async def handle_restore( msg: dict[str, Any], ) -> None: """Restore a backup.""" - await hass.data[DATA_MANAGER].async_restore_backup( - msg["backup_id"], - agent_id=msg["agent_id"], - password=msg.get("password"), - restore_addons=msg.get("restore_addons"), - restore_database=msg["restore_database"], - restore_folders=msg.get("restore_folders"), - restore_homeassistant=msg["restore_homeassistant"], - ) - connection.send_result(msg["id"]) + try: + await hass.data[DATA_MANAGER].async_restore_backup( + msg["backup_id"], + agent_id=msg["agent_id"], + password=msg.get("password"), + restore_addons=msg.get("restore_addons"), + restore_database=msg["restore_database"], + restore_folders=msg.get("restore_folders"), + restore_homeassistant=msg["restore_homeassistant"], + ) + except IncorrectPasswordError: + connection.send_error(msg["id"], "password_incorrect", "Incorrect password") + else: + connection.send_result(msg["id"]) @websocket_api.require_admin diff --git a/tests/components/backup/snapshots/test_websocket.ambr b/tests/components/backup/snapshots/test_websocket.ambr index dbad733d83a..4de06861b67 100644 --- a/tests/components/backup/snapshots/test_websocket.ambr +++ b/tests/components/backup/snapshots/test_websocket.ambr @@ -3050,6 +3050,17 @@ # name: test_restore_remote_agent[remote_agents1-backups1].1 1 # --- +# name: test_restore_wrong_password + dict({ + 'error': dict({ + 'code': 'password_incorrect', + 'message': 'Incorrect password', + }), + 'id': 1, + 'success': False, + 'type': 'result', + }) +# --- # name: test_subscribe_event dict({ 'event': dict({ diff --git a/tests/components/backup/test_manager.py b/tests/components/backup/test_manager.py index e976ad0c099..1c45c86149b 100644 --- a/tests/components/backup/test_manager.py +++ b/tests/components/backup/test_manager.py @@ -1120,6 +1120,9 @@ async def test_async_trigger_restore( patch("pathlib.Path.open"), patch("pathlib.Path.write_text") as mocked_write_text, patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call, + patch( + "homeassistant.components.backup.manager.validate_password" + ) as validate_password_mock, patch.object(BackupAgentTest, "async_download_backup") as download_mock, ): download_mock.return_value.__aiter__.return_value = iter((b"backup data",)) @@ -1132,19 +1135,72 @@ async def test_async_trigger_restore( restore_folders=None, restore_homeassistant=restore_homeassistant, ) + backup_path = f"{hass.config.path()}/{dir}/abc123.tar" expected_restore_file = json.dumps( { - "path": f"{hass.config.path()}/{dir}/abc123.tar", + "path": backup_path, "password": password, "remove_after_restore": agent_id != LOCAL_AGENT_ID, "restore_database": restore_database, "restore_homeassistant": restore_homeassistant, } ) + validate_password_mock.assert_called_once_with(Path(backup_path), password) assert mocked_write_text.call_args[0][0] == expected_restore_file assert mocked_service_call.called +async def test_async_trigger_restore_wrong_password(hass: HomeAssistant) -> None: + """Test trigger restore.""" + password = "hunter2" + manager = BackupManager(hass, CoreBackupReaderWriter(hass)) + hass.data[DATA_MANAGER] = manager + + await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform) + await _setup_backup_platform( + hass, + domain="test", + platform=Mock( + async_get_backup_agents=AsyncMock( + return_value=[BackupAgentTest("remote", backups=[TEST_BACKUP_ABC123])] + ), + spec_set=BackupAgentPlatformProtocol, + ), + ) + await manager.load_platforms() + + local_agent = manager.backup_agents[LOCAL_AGENT_ID] + local_agent._backups = {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123} + local_agent._loaded_backups = True + + with ( + patch("pathlib.Path.exists", return_value=True), + patch("pathlib.Path.write_text") as mocked_write_text, + patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call, + patch( + "homeassistant.components.backup.manager.validate_password" + ) as validate_password_mock, + ): + validate_password_mock.return_value = False + with pytest.raises( + HomeAssistantError, match="The password provided is incorrect." + ): + await manager.async_restore_backup( + TEST_BACKUP_ABC123.backup_id, + agent_id=LOCAL_AGENT_ID, + password=password, + restore_addons=None, + restore_database=True, + restore_folders=None, + restore_homeassistant=True, + ) + + backup_path = f"{hass.config.path()}/backups/abc123.tar" + validate_password_mock.assert_called_once_with(Path(backup_path), password) + mocked_write_text.assert_not_called() + mocked_service_call.assert_not_called() + + @pytest.mark.parametrize( ("parameters", "expected_error"), [ @@ -1191,6 +1247,11 @@ async def test_async_trigger_restore_wrong_parameters( with ( patch("pathlib.Path.exists", return_value=True), + patch("pathlib.Path.write_text") as mocked_write_text, + patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call, pytest.raises(HomeAssistantError, match=expected_error), ): await manager.async_restore_backup(**(default_parameters | parameters)) + + mocked_write_text.assert_not_called() + mocked_service_call.assert_not_called() diff --git a/tests/components/backup/test_util.py b/tests/components/backup/test_util.py index 888029f2e35..60cfc77b1aa 100644 --- a/tests/components/backup/test_util.py +++ b/tests/components/backup/test_util.py @@ -2,12 +2,13 @@ from __future__ import annotations +import tarfile from unittest.mock import Mock, patch import pytest from homeassistant.components.backup import AddonInfo, AgentBackup, Folder -from homeassistant.components.backup.util import read_backup +from homeassistant.components.backup.util import read_backup, validate_password @pytest.mark.parametrize( @@ -83,6 +84,49 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) - mock_path.stat.return_value.st_size = 1234 with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar: - mock_open_tar.return_value.__enter__.return_value.extractfile().read.return_value = backup_json_content + mock_open_tar.return_value.__enter__.return_value.extractfile.return_value.read.return_value = backup_json_content backup = read_backup(mock_path) assert backup == expected_backup + + +@pytest.mark.parametrize("password", [None, "hunter2"]) +def test_validate_password(password: str | None) -> None: + """Test validating a password.""" + mock_path = Mock() + + with ( + patch("homeassistant.components.backup.util.tarfile.open"), + patch("homeassistant.components.backup.util.SecureTarFile"), + ): + assert validate_password(mock_path, password) is True + + +@pytest.mark.parametrize("password", [None, "hunter2"]) +@pytest.mark.parametrize("secure_tar_side_effect", [tarfile.ReadError, Exception]) +def test_validate_password_wrong_password( + password: str | None, secure_tar_side_effect: Exception +) -> None: + """Test validating a password.""" + mock_path = Mock() + + with ( + patch("homeassistant.components.backup.util.tarfile.open"), + patch( + "homeassistant.components.backup.util.SecureTarFile", + ) as mock_secure_tar, + ): + mock_secure_tar.return_value.__enter__.side_effect = secure_tar_side_effect + assert validate_password(mock_path, password) is False + + +def test_validate_password_no_homeassistant() -> None: + """Test validating a password.""" + mock_path = Mock() + + with ( + patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar, + ): + mock_open_tar.return_value.__enter__.return_value.extractfile.side_effect = ( + KeyError + ) + assert validate_password(mock_path, "hunter2") is False diff --git a/tests/components/backup/test_websocket.py b/tests/components/backup/test_websocket.py index c75fb978082..b407241be54 100644 --- a/tests/components/backup/test_websocket.py +++ b/tests/components/backup/test_websocket.py @@ -571,6 +571,7 @@ async def test_restore_local_agent( with ( patch("pathlib.Path.exists", return_value=True), patch("pathlib.Path.write_text"), + patch("homeassistant.components.backup.manager.validate_password"), ): await client.send_json_auto_id( { @@ -606,7 +607,11 @@ async def test_restore_remote_agent( client = await hass_ws_client(hass) await hass.async_block_till_done() - with patch("pathlib.Path.write_text"), patch("pathlib.Path.open"): + with ( + patch("pathlib.Path.write_text"), + patch("pathlib.Path.open"), + patch("homeassistant.components.backup.manager.validate_password"), + ): await client.send_json_auto_id( { "type": "backup/restore", @@ -618,6 +623,39 @@ async def test_restore_remote_agent( assert len(restart_calls) == snapshot +async def test_restore_wrong_password( + hass: HomeAssistant, + hass_ws_client: WebSocketGenerator, + snapshot: SnapshotAssertion, +) -> None: + """Test calling the restore command.""" + await setup_backup_integration( + hass, with_hassio=False, backups={LOCAL_AGENT_ID: [TEST_BACKUP_ABC123]} + ) + restart_calls = async_mock_service(hass, "homeassistant", "restart") + + client = await hass_ws_client(hass) + await hass.async_block_till_done() + + with ( + patch("pathlib.Path.exists", return_value=True), + patch("pathlib.Path.write_text"), + patch( + "homeassistant.components.backup.manager.validate_password", + return_value=False, + ), + ): + await client.send_json_auto_id( + { + "type": "backup/restore", + "backup_id": "abc123", + "agent_id": "backup.local", + } + ) + assert await client.receive_json() == snapshot + assert len(restart_calls) == 0 + + @pytest.mark.parametrize( "access_token_fixture_name", ["hass_access_token", "hass_supervisor_access_token"],