Validate password before restoring backup (#133647)

* Validate password before restoring backup

* Raise specific error when password is incorrect
This commit is contained in:
Erik Montnemery 2024-12-20 15:43:46 +01:00 committed by GitHub
parent 1c0135880d
commit 5afb9a5053
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 220 additions and 17 deletions

View File

@ -48,7 +48,11 @@ from .const import (
) )
from .models import AgentBackup, Folder from .models import AgentBackup, Folder
from .store import BackupStore from .store import BackupStore
from .util import make_backup_dir, read_backup from .util import make_backup_dir, read_backup, validate_password
class IncorrectPasswordError(HomeAssistantError):
"""Raised when the password is incorrect."""
@dataclass(frozen=True, kw_only=True, slots=True) @dataclass(frozen=True, kw_only=True, slots=True)
@ -1269,6 +1273,12 @@ class CoreBackupReaderWriter(BackupReaderWriter):
remove_after_restore = True remove_after_restore = True
password_valid = await self._hass.async_add_executor_job(
validate_password, path, password
)
if not password_valid:
raise IncorrectPasswordError("The password provided is incorrect.")
def _write_restore_file() -> None: def _write_restore_file() -> None:
"""Write the restore file.""" """Write the restore file."""
Path(self._hass.config.path(RESTORE_BACKUP_FILE)).write_text( Path(self._hass.config.path(RESTORE_BACKUP_FILE)).write_text(

View File

@ -9,11 +9,13 @@ import tarfile
from typing import cast from typing import cast
import aiohttp import aiohttp
from securetar import SecureTarFile
from homeassistant.backup_restore import password_to_key
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.util.json import JsonObjectType, json_loads_object from homeassistant.util.json import JsonObjectType, json_loads_object
from .const import BUF_SIZE from .const import BUF_SIZE, LOGGER
from .models import AddonInfo, AgentBackup, Folder from .models import AddonInfo, AgentBackup, Folder
@ -71,6 +73,39 @@ def read_backup(backup_path: Path) -> AgentBackup:
) )
def validate_password(path: Path, password: str | None) -> bool:
"""Validate the password."""
with tarfile.open(path, "r:", bufsize=BUF_SIZE) as backup_file:
compressed = False
ha_tar_name = "homeassistant.tar"
try:
ha_tar = backup_file.extractfile(ha_tar_name)
except KeyError:
compressed = True
ha_tar_name = "homeassistant.tar.gz"
try:
ha_tar = backup_file.extractfile(ha_tar_name)
except KeyError:
LOGGER.error("No homeassistant.tar or homeassistant.tar.gz found")
return False
try:
with SecureTarFile(
path, # Not used
gzip=compressed,
key=password_to_key(password) if password is not None else None,
mode="r",
fileobj=ha_tar,
):
# If we can read the tar file, the password is correct
return True
except tarfile.ReadError:
LOGGER.debug("Invalid password")
return False
except Exception: # noqa: BLE001
LOGGER.exception("Unexpected error validating password")
return False
async def receive_file( async def receive_file(
hass: HomeAssistant, contents: aiohttp.BodyPartReader, path: Path hass: HomeAssistant, contents: aiohttp.BodyPartReader, path: Path
) -> None: ) -> None:

View File

@ -9,7 +9,7 @@ from homeassistant.core import HomeAssistant, callback
from .config import ScheduleState from .config import ScheduleState
from .const import DATA_MANAGER, LOGGER from .const import DATA_MANAGER, LOGGER
from .manager import ManagerStateEvent from .manager import IncorrectPasswordError, ManagerStateEvent
from .models import Folder from .models import Folder
@ -131,16 +131,20 @@ async def handle_restore(
msg: dict[str, Any], msg: dict[str, Any],
) -> None: ) -> None:
"""Restore a backup.""" """Restore a backup."""
await hass.data[DATA_MANAGER].async_restore_backup( try:
msg["backup_id"], await hass.data[DATA_MANAGER].async_restore_backup(
agent_id=msg["agent_id"], msg["backup_id"],
password=msg.get("password"), agent_id=msg["agent_id"],
restore_addons=msg.get("restore_addons"), password=msg.get("password"),
restore_database=msg["restore_database"], restore_addons=msg.get("restore_addons"),
restore_folders=msg.get("restore_folders"), restore_database=msg["restore_database"],
restore_homeassistant=msg["restore_homeassistant"], restore_folders=msg.get("restore_folders"),
) restore_homeassistant=msg["restore_homeassistant"],
connection.send_result(msg["id"]) )
except IncorrectPasswordError:
connection.send_error(msg["id"], "password_incorrect", "Incorrect password")
else:
connection.send_result(msg["id"])
@websocket_api.require_admin @websocket_api.require_admin

View File

@ -3050,6 +3050,17 @@
# name: test_restore_remote_agent[remote_agents1-backups1].1 # name: test_restore_remote_agent[remote_agents1-backups1].1
1 1
# --- # ---
# name: test_restore_wrong_password
dict({
'error': dict({
'code': 'password_incorrect',
'message': 'Incorrect password',
}),
'id': 1,
'success': False,
'type': 'result',
})
# ---
# name: test_subscribe_event # name: test_subscribe_event
dict({ dict({
'event': dict({ 'event': dict({

View File

@ -1120,6 +1120,9 @@ async def test_async_trigger_restore(
patch("pathlib.Path.open"), patch("pathlib.Path.open"),
patch("pathlib.Path.write_text") as mocked_write_text, patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call, patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
patch(
"homeassistant.components.backup.manager.validate_password"
) as validate_password_mock,
patch.object(BackupAgentTest, "async_download_backup") as download_mock, patch.object(BackupAgentTest, "async_download_backup") as download_mock,
): ):
download_mock.return_value.__aiter__.return_value = iter((b"backup data",)) download_mock.return_value.__aiter__.return_value = iter((b"backup data",))
@ -1132,19 +1135,72 @@ async def test_async_trigger_restore(
restore_folders=None, restore_folders=None,
restore_homeassistant=restore_homeassistant, restore_homeassistant=restore_homeassistant,
) )
backup_path = f"{hass.config.path()}/{dir}/abc123.tar"
expected_restore_file = json.dumps( expected_restore_file = json.dumps(
{ {
"path": f"{hass.config.path()}/{dir}/abc123.tar", "path": backup_path,
"password": password, "password": password,
"remove_after_restore": agent_id != LOCAL_AGENT_ID, "remove_after_restore": agent_id != LOCAL_AGENT_ID,
"restore_database": restore_database, "restore_database": restore_database,
"restore_homeassistant": restore_homeassistant, "restore_homeassistant": restore_homeassistant,
} }
) )
validate_password_mock.assert_called_once_with(Path(backup_path), password)
assert mocked_write_text.call_args[0][0] == expected_restore_file assert mocked_write_text.call_args[0][0] == expected_restore_file
assert mocked_service_call.called assert mocked_service_call.called
async def test_async_trigger_restore_wrong_password(hass: HomeAssistant) -> None:
"""Test trigger restore."""
password = "hunter2"
manager = BackupManager(hass, CoreBackupReaderWriter(hass))
hass.data[DATA_MANAGER] = manager
await _setup_backup_platform(hass, domain=DOMAIN, platform=local_backup_platform)
await _setup_backup_platform(
hass,
domain="test",
platform=Mock(
async_get_backup_agents=AsyncMock(
return_value=[BackupAgentTest("remote", backups=[TEST_BACKUP_ABC123])]
),
spec_set=BackupAgentPlatformProtocol,
),
)
await manager.load_platforms()
local_agent = manager.backup_agents[LOCAL_AGENT_ID]
local_agent._backups = {TEST_BACKUP_ABC123.backup_id: TEST_BACKUP_ABC123}
local_agent._loaded_backups = True
with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
patch(
"homeassistant.components.backup.manager.validate_password"
) as validate_password_mock,
):
validate_password_mock.return_value = False
with pytest.raises(
HomeAssistantError, match="The password provided is incorrect."
):
await manager.async_restore_backup(
TEST_BACKUP_ABC123.backup_id,
agent_id=LOCAL_AGENT_ID,
password=password,
restore_addons=None,
restore_database=True,
restore_folders=None,
restore_homeassistant=True,
)
backup_path = f"{hass.config.path()}/backups/abc123.tar"
validate_password_mock.assert_called_once_with(Path(backup_path), password)
mocked_write_text.assert_not_called()
mocked_service_call.assert_not_called()
@pytest.mark.parametrize( @pytest.mark.parametrize(
("parameters", "expected_error"), ("parameters", "expected_error"),
[ [
@ -1191,6 +1247,11 @@ async def test_async_trigger_restore_wrong_parameters(
with ( with (
patch("pathlib.Path.exists", return_value=True), patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text") as mocked_write_text,
patch("homeassistant.core.ServiceRegistry.async_call") as mocked_service_call,
pytest.raises(HomeAssistantError, match=expected_error), pytest.raises(HomeAssistantError, match=expected_error),
): ):
await manager.async_restore_backup(**(default_parameters | parameters)) await manager.async_restore_backup(**(default_parameters | parameters))
mocked_write_text.assert_not_called()
mocked_service_call.assert_not_called()

View File

@ -2,12 +2,13 @@
from __future__ import annotations from __future__ import annotations
import tarfile
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import pytest import pytest
from homeassistant.components.backup import AddonInfo, AgentBackup, Folder from homeassistant.components.backup import AddonInfo, AgentBackup, Folder
from homeassistant.components.backup.util import read_backup from homeassistant.components.backup.util import read_backup, validate_password
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -83,6 +84,49 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
mock_path.stat.return_value.st_size = 1234 mock_path.stat.return_value.st_size = 1234
with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar: with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
mock_open_tar.return_value.__enter__.return_value.extractfile().read.return_value = backup_json_content mock_open_tar.return_value.__enter__.return_value.extractfile.return_value.read.return_value = backup_json_content
backup = read_backup(mock_path) backup = read_backup(mock_path)
assert backup == expected_backup assert backup == expected_backup
@pytest.mark.parametrize("password", [None, "hunter2"])
def test_validate_password(password: str | None) -> None:
"""Test validating a password."""
mock_path = Mock()
with (
patch("homeassistant.components.backup.util.tarfile.open"),
patch("homeassistant.components.backup.util.SecureTarFile"),
):
assert validate_password(mock_path, password) is True
@pytest.mark.parametrize("password", [None, "hunter2"])
@pytest.mark.parametrize("secure_tar_side_effect", [tarfile.ReadError, Exception])
def test_validate_password_wrong_password(
password: str | None, secure_tar_side_effect: Exception
) -> None:
"""Test validating a password."""
mock_path = Mock()
with (
patch("homeassistant.components.backup.util.tarfile.open"),
patch(
"homeassistant.components.backup.util.SecureTarFile",
) as mock_secure_tar,
):
mock_secure_tar.return_value.__enter__.side_effect = secure_tar_side_effect
assert validate_password(mock_path, password) is False
def test_validate_password_no_homeassistant() -> None:
"""Test validating a password."""
mock_path = Mock()
with (
patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar,
):
mock_open_tar.return_value.__enter__.return_value.extractfile.side_effect = (
KeyError
)
assert validate_password(mock_path, "hunter2") is False

View File

@ -571,6 +571,7 @@ async def test_restore_local_agent(
with ( with (
patch("pathlib.Path.exists", return_value=True), patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text"), patch("pathlib.Path.write_text"),
patch("homeassistant.components.backup.manager.validate_password"),
): ):
await client.send_json_auto_id( await client.send_json_auto_id(
{ {
@ -606,7 +607,11 @@ async def test_restore_remote_agent(
client = await hass_ws_client(hass) client = await hass_ws_client(hass)
await hass.async_block_till_done() await hass.async_block_till_done()
with patch("pathlib.Path.write_text"), patch("pathlib.Path.open"): with (
patch("pathlib.Path.write_text"),
patch("pathlib.Path.open"),
patch("homeassistant.components.backup.manager.validate_password"),
):
await client.send_json_auto_id( await client.send_json_auto_id(
{ {
"type": "backup/restore", "type": "backup/restore",
@ -618,6 +623,39 @@ async def test_restore_remote_agent(
assert len(restart_calls) == snapshot assert len(restart_calls) == snapshot
async def test_restore_wrong_password(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test calling the restore command."""
await setup_backup_integration(
hass, with_hassio=False, backups={LOCAL_AGENT_ID: [TEST_BACKUP_ABC123]}
)
restart_calls = async_mock_service(hass, "homeassistant", "restart")
client = await hass_ws_client(hass)
await hass.async_block_till_done()
with (
patch("pathlib.Path.exists", return_value=True),
patch("pathlib.Path.write_text"),
patch(
"homeassistant.components.backup.manager.validate_password",
return_value=False,
),
):
await client.send_json_auto_id(
{
"type": "backup/restore",
"backup_id": "abc123",
"agent_id": "backup.local",
}
)
assert await client.receive_json() == snapshot
assert len(restart_calls) == 0
@pytest.mark.parametrize( @pytest.mark.parametrize(
"access_token_fixture_name", "access_token_fixture_name",
["hass_access_token", "hass_supervisor_access_token"], ["hass_access_token", "hass_supervisor_access_token"],