"""Tests for the Backup integration's utility functions.""" from __future__ import annotations from collections.abc import AsyncIterator import dataclasses import tarfile from unittest.mock import Mock, patch import pytest import securetar from homeassistant.components.backup import DOMAIN, AddonInfo, AgentBackup, Folder from homeassistant.components.backup.util import ( DecryptedBackupStreamer, EncryptedBackupStreamer, read_backup, suggested_filename, validate_password, ) from homeassistant.core import HomeAssistant from tests.common import get_fixture_path @pytest.mark.parametrize( ("backup_json_content", "expected_backup"), [ ( b'{"compressed":true,"date":"2024-12-02T07:23:58.261875-05:00","homeassistant":' b'{"exclude_database":true,"version":"2024.12.0.dev0"},"name":"test",' b'"protected":true,"slug":"455645fe","type":"partial","version":2}', AgentBackup( addons=[], backup_id="455645fe", date="2024-12-02T07:23:58.261875-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name="test", protected=True, size=1234, ), ), ( b'{"slug":"d4b8fdc6","version":2,"name":"Core 2025.1.0.dev0",' b'"date":"2024-12-20T11:27:51.119062+00:00","type":"partial",' b'"supervisor_version":"2024.12.1.dev1803",' b'"extra":{"instance_id":"6b453733d2d74d2a9ae432ff2fbaaa64",' b'"with_automatic_settings":false},"homeassistant":' b'{"version":"2025.1.0.dev202412200230","exclude_database":false,"size":0.0},' b'"compressed":true,"protected":true,"repositories":[' b'"https://github.com/home-assistant/hassio-addons-development","local",' b'"https://github.com/esphome/home-assistant-addon","core",' b'"https://github.com/music-assistant/home-assistant-addon",' b'"https://github.com/hassio-addons/repository"],"crypto":"aes128",' b'"folders":["share","media"],"addons":[{"slug":"core_configurator",' b'"name":"File editor","version":"5.5.0","size":0.0},' b'{"slug":"ae6e943c_remote_api","name":"Remote API proxy",' b'"version":"1.3.0","size":0.0}],"docker":{"registries":{}}}', AgentBackup( addons=[ AddonInfo( name="File editor", slug="core_configurator", version="5.5.0", ), AddonInfo( name="Remote API proxy", slug="ae6e943c_remote_api", version="1.3.0", ), ], backup_id="d4b8fdc6", date="2024-12-20T11:27:51.119062+00:00", database_included=True, extra_metadata={ "instance_id": "6b453733d2d74d2a9ae432ff2fbaaa64", "with_automatic_settings": False, }, folders=[Folder.SHARE, Folder.MEDIA], homeassistant_included=True, homeassistant_version="2025.1.0.dev202412200230", name="Core 2025.1.0.dev0", protected=True, size=1234, ), ), ], ) def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -> None: """Test reading a backup.""" mock_path = Mock() mock_path.stat.return_value.st_size = 1234 with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar: mock_open_tar.return_value.__enter__.return_value.extractfile.return_value.read.return_value = backup_json_content backup = read_backup(mock_path) assert backup == expected_backup @pytest.mark.parametrize("password", [None, "hunter2"]) def test_validate_password(password: str | None) -> None: """Test validating a password.""" mock_path = Mock() with ( patch("homeassistant.components.backup.util.tarfile.open"), patch("homeassistant.components.backup.util.SecureTarFile"), ): assert validate_password(mock_path, password) is True @pytest.mark.parametrize("password", [None, "hunter2"]) @pytest.mark.parametrize("secure_tar_side_effect", [tarfile.ReadError, Exception]) def test_validate_password_wrong_password( password: str | None, secure_tar_side_effect: Exception ) -> None: """Test validating a password.""" mock_path = Mock() with ( patch("homeassistant.components.backup.util.tarfile.open"), patch( "homeassistant.components.backup.util.SecureTarFile", ) as mock_secure_tar, ): mock_secure_tar.return_value.__enter__.side_effect = secure_tar_side_effect assert validate_password(mock_path, password) is False def test_validate_password_no_homeassistant() -> None: """Test validating a password.""" mock_path = Mock() with ( patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar, ): mock_open_tar.return_value.__enter__.return_value.extractfile.side_effect = ( KeyError ) assert validate_password(mock_path, "hunter2") is False async def test_decrypted_backup_streamer(hass: HomeAssistant) -> None: """Test the decrypted backup streamer.""" decrypted_backup_path = get_fixture_path( "test_backups/c0cb53bd.tar.decrypted", DOMAIN ) encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN) backup = AgentBackup( addons=["addon_1", "addon_2"], backup_id="1234", date="2024-12-02T07:23:58.261875-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name="test", protected=True, size=encrypted_backup_path.stat().st_size, ) expected_padding = b"\0" * 40960 # 4 x 10240 byte of padding async def send_backup() -> AsyncIterator[bytes]: f = encrypted_backup_path.open("rb") while chunk := f.read(1024): yield chunk async def open_backup() -> AsyncIterator[bytes]: return send_backup() decryptor = DecryptedBackupStreamer(hass, backup, open_backup, "hunter2") assert decryptor.backup() == dataclasses.replace( backup, protected=False, size=backup.size + len(expected_padding) ) decrypted_stream = await decryptor.open_stream() decrypted_output = b"" async for chunk in decrypted_stream: decrypted_output += chunk await decryptor.wait() # Expect the output to match the stored decrypted backup file, with additional # padding. decrypted_backup_data = decrypted_backup_path.read_bytes() assert decrypted_output == decrypted_backup_data + expected_padding async def test_decrypted_backup_streamer_wrong_password(hass: HomeAssistant) -> None: """Test the decrypted backup streamer with wrong password.""" encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN) backup = AgentBackup( addons=["addon_1", "addon_2"], backup_id="1234", date="2024-12-02T07:23:58.261875-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name="test", protected=True, size=encrypted_backup_path.stat().st_size, ) async def send_backup() -> AsyncIterator[bytes]: f = encrypted_backup_path.open("rb") while chunk := f.read(1024): yield chunk async def open_backup() -> AsyncIterator[bytes]: return send_backup() decryptor = DecryptedBackupStreamer(hass, backup, open_backup, "wrong_password") decrypted_stream = await decryptor.open_stream() async for _ in decrypted_stream: pass await decryptor.wait() assert isinstance(decryptor._workers[0].error, securetar.SecureTarReadError) async def test_encrypted_backup_streamer(hass: HomeAssistant) -> None: """Test the encrypted backup streamer.""" decrypted_backup_path = get_fixture_path( "test_backups/c0cb53bd.tar.decrypted", DOMAIN ) encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN) backup = AgentBackup( addons=["addon_1", "addon_2"], backup_id="1234", date="2024-12-02T07:23:58.261875-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name="test", protected=False, size=decrypted_backup_path.stat().st_size, ) expected_padding = b"\0" * 40960 # 4 x 10240 byte of padding async def send_backup() -> AsyncIterator[bytes]: f = decrypted_backup_path.open("rb") while chunk := f.read(1024): yield chunk async def open_backup() -> AsyncIterator[bytes]: return send_backup() # Patch os.urandom to return values matching the nonce used in the encrypted # test backup. The backup has three inner tar files, but we need an extra nonce # for a future planned supervisor.tar. with patch("os.urandom") as mock_randbytes: mock_randbytes.side_effect = ( bytes.fromhex("bd34ea6fc93b0614ce7af2b44b4f3957"), bytes.fromhex("1296d6f7554e2cb629a3dc4082bae36c"), bytes.fromhex("8b7a58e48faf2efb23845eb3164382e0"), bytes.fromhex("00000000000000000000000000000000"), ) encryptor = EncryptedBackupStreamer(hass, backup, open_backup, "hunter2") assert encryptor.backup() == dataclasses.replace( backup, protected=True, size=backup.size + len(expected_padding) ) encrypted_stream = await encryptor.open_stream() encrypted_output = b"" async for chunk in encrypted_stream: encrypted_output += chunk await encryptor.wait() # Expect the output to match the stored encrypted backup file, with additional # padding. encrypted_backup_data = encrypted_backup_path.read_bytes() assert encrypted_output == encrypted_backup_data + expected_padding async def test_encrypted_backup_streamer_random_nonce(hass: HomeAssistant) -> None: """Test the encrypted backup streamer.""" decrypted_backup_path = get_fixture_path( "test_backups/c0cb53bd.tar.decrypted", DOMAIN ) encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN) backup = AgentBackup( addons=["addon_1", "addon_2"], backup_id="1234", date="2024-12-02T07:23:58.261875-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name="test", protected=False, size=decrypted_backup_path.stat().st_size, ) async def send_backup() -> AsyncIterator[bytes]: f = decrypted_backup_path.open("rb") while chunk := f.read(1024): yield chunk async def open_backup() -> AsyncIterator[bytes]: return send_backup() encryptor1 = EncryptedBackupStreamer(hass, backup, open_backup, "hunter2") encryptor2 = EncryptedBackupStreamer(hass, backup, open_backup, "hunter2") async def read_stream(stream: AsyncIterator[bytes]) -> bytes: output = b"" async for chunk in stream: output += chunk return output # When reading twice from the same streamer, the same nonce is used. encrypted_output1 = await read_stream(await encryptor1.open_stream()) encrypted_output2 = await read_stream(await encryptor1.open_stream()) assert encrypted_output1 == encrypted_output2 encrypted_output3 = await read_stream(await encryptor2.open_stream()) encrypted_output4 = await read_stream(await encryptor2.open_stream()) assert encrypted_output3 == encrypted_output4 # Wait for workers to terminate await encryptor1.wait() await encryptor2.wait() # Output from the two streames should differ but have the same length. assert encrypted_output1 != encrypted_output3 assert len(encrypted_output1) == len(encrypted_output3) # Expect the output length to match the stored encrypted backup file, with # additional padding. encrypted_backup_data = encrypted_backup_path.read_bytes() # 4 x 10240 byte of padding assert len(encrypted_output1) == len(encrypted_backup_data) + 40960 assert encrypted_output1[: len(encrypted_backup_data)] != encrypted_backup_data async def test_encrypted_backup_streamer_error(hass: HomeAssistant) -> None: """Test the encrypted backup streamer.""" decrypted_backup_path = get_fixture_path( "test_backups/c0cb53bd.tar.decrypted", DOMAIN ) backup = AgentBackup( addons=["addon_1", "addon_2"], backup_id="1234", date="2024-12-02T07:23:58.261875-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name="test", protected=False, size=decrypted_backup_path.stat().st_size, ) async def send_backup() -> AsyncIterator[bytes]: f = decrypted_backup_path.open("rb") while chunk := f.read(1024): yield chunk async def open_backup() -> AsyncIterator[bytes]: return send_backup() # Patch os.urandom to return values matching the nonce used in the encrypted # test backup. The backup has three inner tar files, but we need an extra nonce # for a future planned supervisor.tar. encryptor = EncryptedBackupStreamer(hass, backup, open_backup, "hunter2") with patch( "homeassistant.components.backup.util.tarfile.open", side_effect=tarfile.TarError, ): encrypted_stream = await encryptor.open_stream() async for _ in encrypted_stream: pass # Expect the output to match the stored encrypted backup file, with additional # padding. await encryptor.wait() assert isinstance(encryptor._workers[0].error, tarfile.TarError) @pytest.mark.parametrize( ("name", "resulting_filename"), [ ("test", "test_-_2025-01-30_13.42_12345678.tar"), (" leading spaces", "leading_spaces_-_2025-01-30_13.42_12345678.tar"), ("trailing spaces ", "trailing_spaces_-_2025-01-30_13.42_12345678.tar"), ("double spaces ", "double_spaces_-_2025-01-30_13.42_12345678.tar"), ], ) def test_suggested_filename(name: str, resulting_filename: str) -> None: """Test suggesting a filename.""" backup = AgentBackup( addons=[], backup_id="1234", date="2025-01-30 13:42:12.345678-05:00", database_included=False, extra_metadata={}, folders=[], homeassistant_included=True, homeassistant_version="2024.12.0.dev0", name=name, protected=False, size=1234, ) assert suggested_filename(backup) == resulting_filename