mirror of
https://github.com/home-assistant/core.git
synced 2025-07-09 06:17:07 +00:00
Don't encrypt or decrypt unknown files in backup archives (#144495)
This commit is contained in:
parent
e0fb612e82
commit
42f53ff917
@ -22,7 +22,7 @@ from . import util
|
|||||||
from .agent import BackupAgent
|
from .agent import BackupAgent
|
||||||
from .const import DATA_MANAGER
|
from .const import DATA_MANAGER
|
||||||
from .manager import BackupManager
|
from .manager import BackupManager
|
||||||
from .models import BackupNotFound
|
from .models import AgentBackup, BackupNotFound
|
||||||
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
@ -85,7 +85,15 @@ class DownloadBackupView(HomeAssistantView):
|
|||||||
request, headers, backup_id, agent_id, agent, manager
|
request, headers, backup_id, agent_id, agent, manager
|
||||||
)
|
)
|
||||||
return await self._send_backup_with_password(
|
return await self._send_backup_with_password(
|
||||||
hass, request, headers, backup_id, agent_id, password, agent, manager
|
hass,
|
||||||
|
backup,
|
||||||
|
request,
|
||||||
|
headers,
|
||||||
|
backup_id,
|
||||||
|
agent_id,
|
||||||
|
password,
|
||||||
|
agent,
|
||||||
|
manager,
|
||||||
)
|
)
|
||||||
except BackupNotFound:
|
except BackupNotFound:
|
||||||
return Response(status=HTTPStatus.NOT_FOUND)
|
return Response(status=HTTPStatus.NOT_FOUND)
|
||||||
@ -116,6 +124,7 @@ class DownloadBackupView(HomeAssistantView):
|
|||||||
async def _send_backup_with_password(
|
async def _send_backup_with_password(
|
||||||
self,
|
self,
|
||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
|
backup: AgentBackup,
|
||||||
request: Request,
|
request: Request,
|
||||||
headers: dict[istr, str],
|
headers: dict[istr, str],
|
||||||
backup_id: str,
|
backup_id: str,
|
||||||
@ -144,7 +153,8 @@ class DownloadBackupView(HomeAssistantView):
|
|||||||
|
|
||||||
stream = util.AsyncIteratorWriter(hass)
|
stream = util.AsyncIteratorWriter(hass)
|
||||||
worker = threading.Thread(
|
worker = threading.Thread(
|
||||||
target=util.decrypt_backup, args=[reader, stream, password, on_done, 0, []]
|
target=util.decrypt_backup,
|
||||||
|
args=[backup, reader, stream, password, on_done, 0, []],
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
worker.start()
|
worker.start()
|
||||||
|
@ -295,13 +295,26 @@ def validate_password_stream(
|
|||||||
raise BackupEmpty
|
raise BackupEmpty
|
||||||
|
|
||||||
|
|
||||||
|
def _get_expected_archives(backup: AgentBackup) -> set[str]:
|
||||||
|
"""Get the expected archives in the backup."""
|
||||||
|
expected_archives = set()
|
||||||
|
if backup.homeassistant_included:
|
||||||
|
expected_archives.add("homeassistant")
|
||||||
|
for addon in backup.addons:
|
||||||
|
expected_archives.add(addon.slug)
|
||||||
|
for folder in backup.folders:
|
||||||
|
expected_archives.add(folder.value)
|
||||||
|
return expected_archives
|
||||||
|
|
||||||
|
|
||||||
def decrypt_backup(
|
def decrypt_backup(
|
||||||
|
backup: AgentBackup,
|
||||||
input_stream: IO[bytes],
|
input_stream: IO[bytes],
|
||||||
output_stream: IO[bytes],
|
output_stream: IO[bytes],
|
||||||
password: str | None,
|
password: str | None,
|
||||||
on_done: Callable[[Exception | None], None],
|
on_done: Callable[[Exception | None], None],
|
||||||
minimum_size: int,
|
minimum_size: int,
|
||||||
nonces: list[bytes],
|
nonces: NonceGenerator,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Decrypt a backup."""
|
"""Decrypt a backup."""
|
||||||
error: Exception | None = None
|
error: Exception | None = None
|
||||||
@ -315,7 +328,7 @@ def decrypt_backup(
|
|||||||
fileobj=output_stream, mode="w|", bufsize=BUF_SIZE
|
fileobj=output_stream, mode="w|", bufsize=BUF_SIZE
|
||||||
) as output_tar,
|
) as output_tar,
|
||||||
):
|
):
|
||||||
_decrypt_backup(input_tar, output_tar, password)
|
_decrypt_backup(backup, input_tar, output_tar, password)
|
||||||
except (DecryptError, SecureTarError, tarfile.TarError) as err:
|
except (DecryptError, SecureTarError, tarfile.TarError) as err:
|
||||||
LOGGER.warning("Error decrypting backup: %s", err)
|
LOGGER.warning("Error decrypting backup: %s", err)
|
||||||
error = err
|
error = err
|
||||||
@ -333,15 +346,18 @@ def decrypt_backup(
|
|||||||
|
|
||||||
|
|
||||||
def _decrypt_backup(
|
def _decrypt_backup(
|
||||||
|
backup: AgentBackup,
|
||||||
input_tar: tarfile.TarFile,
|
input_tar: tarfile.TarFile,
|
||||||
output_tar: tarfile.TarFile,
|
output_tar: tarfile.TarFile,
|
||||||
password: str | None,
|
password: str | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Decrypt a backup."""
|
"""Decrypt a backup."""
|
||||||
|
expected_archives = _get_expected_archives(backup)
|
||||||
for obj in input_tar:
|
for obj in input_tar:
|
||||||
# We compare with PurePath to avoid issues with different path separators,
|
# We compare with PurePath to avoid issues with different path separators,
|
||||||
# for example when backup.json is added as "./backup.json"
|
# for example when backup.json is added as "./backup.json"
|
||||||
if PurePath(obj.name) == PurePath("backup.json"):
|
object_path = PurePath(obj.name)
|
||||||
|
if object_path == PurePath("backup.json"):
|
||||||
# Rewrite the backup.json file to indicate that the backup is decrypted
|
# Rewrite the backup.json file to indicate that the backup is decrypted
|
||||||
if not (reader := input_tar.extractfile(obj)):
|
if not (reader := input_tar.extractfile(obj)):
|
||||||
raise DecryptError
|
raise DecryptError
|
||||||
@ -352,7 +368,13 @@ def _decrypt_backup(
|
|||||||
metadata_obj.size = len(updated_metadata_b)
|
metadata_obj.size = len(updated_metadata_b)
|
||||||
output_tar.addfile(metadata_obj, BytesIO(updated_metadata_b))
|
output_tar.addfile(metadata_obj, BytesIO(updated_metadata_b))
|
||||||
continue
|
continue
|
||||||
if not obj.name.endswith((".tar", ".tgz", ".tar.gz")):
|
prefix, _, suffix = object_path.name.partition(".")
|
||||||
|
if suffix not in ("tar", "tgz", "tar.gz"):
|
||||||
|
LOGGER.debug("Unknown file %s will not be decrypted", obj.name)
|
||||||
|
output_tar.addfile(obj, input_tar.extractfile(obj))
|
||||||
|
continue
|
||||||
|
if prefix not in expected_archives:
|
||||||
|
LOGGER.debug("Unknown inner tar file %s will not be decrypted", obj.name)
|
||||||
output_tar.addfile(obj, input_tar.extractfile(obj))
|
output_tar.addfile(obj, input_tar.extractfile(obj))
|
||||||
continue
|
continue
|
||||||
istf = SecureTarFile(
|
istf = SecureTarFile(
|
||||||
@ -371,12 +393,13 @@ def _decrypt_backup(
|
|||||||
|
|
||||||
|
|
||||||
def encrypt_backup(
|
def encrypt_backup(
|
||||||
|
backup: AgentBackup,
|
||||||
input_stream: IO[bytes],
|
input_stream: IO[bytes],
|
||||||
output_stream: IO[bytes],
|
output_stream: IO[bytes],
|
||||||
password: str | None,
|
password: str | None,
|
||||||
on_done: Callable[[Exception | None], None],
|
on_done: Callable[[Exception | None], None],
|
||||||
minimum_size: int,
|
minimum_size: int,
|
||||||
nonces: list[bytes],
|
nonces: NonceGenerator,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Encrypt a backup."""
|
"""Encrypt a backup."""
|
||||||
error: Exception | None = None
|
error: Exception | None = None
|
||||||
@ -390,7 +413,7 @@ def encrypt_backup(
|
|||||||
fileobj=output_stream, mode="w|", bufsize=BUF_SIZE
|
fileobj=output_stream, mode="w|", bufsize=BUF_SIZE
|
||||||
) as output_tar,
|
) as output_tar,
|
||||||
):
|
):
|
||||||
_encrypt_backup(input_tar, output_tar, password, nonces)
|
_encrypt_backup(backup, input_tar, output_tar, password, nonces)
|
||||||
except (EncryptError, SecureTarError, tarfile.TarError) as err:
|
except (EncryptError, SecureTarError, tarfile.TarError) as err:
|
||||||
LOGGER.warning("Error encrypting backup: %s", err)
|
LOGGER.warning("Error encrypting backup: %s", err)
|
||||||
error = err
|
error = err
|
||||||
@ -408,17 +431,20 @@ def encrypt_backup(
|
|||||||
|
|
||||||
|
|
||||||
def _encrypt_backup(
|
def _encrypt_backup(
|
||||||
|
backup: AgentBackup,
|
||||||
input_tar: tarfile.TarFile,
|
input_tar: tarfile.TarFile,
|
||||||
output_tar: tarfile.TarFile,
|
output_tar: tarfile.TarFile,
|
||||||
password: str | None,
|
password: str | None,
|
||||||
nonces: list[bytes],
|
nonces: NonceGenerator,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Encrypt a backup."""
|
"""Encrypt a backup."""
|
||||||
inner_tar_idx = 0
|
inner_tar_idx = 0
|
||||||
|
expected_archives = _get_expected_archives(backup)
|
||||||
for obj in input_tar:
|
for obj in input_tar:
|
||||||
# We compare with PurePath to avoid issues with different path separators,
|
# We compare with PurePath to avoid issues with different path separators,
|
||||||
# for example when backup.json is added as "./backup.json"
|
# for example when backup.json is added as "./backup.json"
|
||||||
if PurePath(obj.name) == PurePath("backup.json"):
|
object_path = PurePath(obj.name)
|
||||||
|
if object_path == PurePath("backup.json"):
|
||||||
# Rewrite the backup.json file to indicate that the backup is encrypted
|
# Rewrite the backup.json file to indicate that the backup is encrypted
|
||||||
if not (reader := input_tar.extractfile(obj)):
|
if not (reader := input_tar.extractfile(obj)):
|
||||||
raise EncryptError
|
raise EncryptError
|
||||||
@ -429,16 +455,21 @@ def _encrypt_backup(
|
|||||||
metadata_obj.size = len(updated_metadata_b)
|
metadata_obj.size = len(updated_metadata_b)
|
||||||
output_tar.addfile(metadata_obj, BytesIO(updated_metadata_b))
|
output_tar.addfile(metadata_obj, BytesIO(updated_metadata_b))
|
||||||
continue
|
continue
|
||||||
if not obj.name.endswith((".tar", ".tgz", ".tar.gz")):
|
prefix, _, suffix = object_path.name.partition(".")
|
||||||
|
if suffix not in ("tar", "tgz", "tar.gz"):
|
||||||
|
LOGGER.debug("Unknown file %s will not be encrypted", obj.name)
|
||||||
output_tar.addfile(obj, input_tar.extractfile(obj))
|
output_tar.addfile(obj, input_tar.extractfile(obj))
|
||||||
continue
|
continue
|
||||||
|
if prefix not in expected_archives:
|
||||||
|
LOGGER.debug("Unknown inner tar file %s will not be encrypted", obj.name)
|
||||||
|
continue
|
||||||
istf = SecureTarFile(
|
istf = SecureTarFile(
|
||||||
None, # Not used
|
None, # Not used
|
||||||
gzip=False,
|
gzip=False,
|
||||||
key=password_to_key(password) if password is not None else None,
|
key=password_to_key(password) if password is not None else None,
|
||||||
mode="r",
|
mode="r",
|
||||||
fileobj=input_tar.extractfile(obj),
|
fileobj=input_tar.extractfile(obj),
|
||||||
nonce=nonces[inner_tar_idx],
|
nonce=nonces.get(inner_tar_idx),
|
||||||
)
|
)
|
||||||
inner_tar_idx += 1
|
inner_tar_idx += 1
|
||||||
with istf.encrypt(obj) as encrypted:
|
with istf.encrypt(obj) as encrypted:
|
||||||
@ -456,17 +487,33 @@ class _CipherWorkerStatus:
|
|||||||
writer: AsyncIteratorWriter
|
writer: AsyncIteratorWriter
|
||||||
|
|
||||||
|
|
||||||
|
class NonceGenerator:
|
||||||
|
"""Generate nonces for encryption."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""Initialize the generator."""
|
||||||
|
self._nonces: dict[int, bytes] = {}
|
||||||
|
|
||||||
|
def get(self, index: int) -> bytes:
|
||||||
|
"""Get a nonce for the given index."""
|
||||||
|
if index not in self._nonces:
|
||||||
|
# Generate a new nonce for the given index
|
||||||
|
self._nonces[index] = os.urandom(16)
|
||||||
|
return self._nonces[index]
|
||||||
|
|
||||||
|
|
||||||
class _CipherBackupStreamer:
|
class _CipherBackupStreamer:
|
||||||
"""Encrypt or decrypt a backup."""
|
"""Encrypt or decrypt a backup."""
|
||||||
|
|
||||||
_cipher_func: Callable[
|
_cipher_func: Callable[
|
||||||
[
|
[
|
||||||
|
AgentBackup,
|
||||||
IO[bytes],
|
IO[bytes],
|
||||||
IO[bytes],
|
IO[bytes],
|
||||||
str | None,
|
str | None,
|
||||||
Callable[[Exception | None], None],
|
Callable[[Exception | None], None],
|
||||||
int,
|
int,
|
||||||
list[bytes],
|
NonceGenerator,
|
||||||
],
|
],
|
||||||
None,
|
None,
|
||||||
]
|
]
|
||||||
@ -484,7 +531,7 @@ class _CipherBackupStreamer:
|
|||||||
self._hass = hass
|
self._hass = hass
|
||||||
self._open_stream = open_stream
|
self._open_stream = open_stream
|
||||||
self._password = password
|
self._password = password
|
||||||
self._nonces: list[bytes] = []
|
self._nonces = NonceGenerator()
|
||||||
|
|
||||||
def size(self) -> int:
|
def size(self) -> int:
|
||||||
"""Return the maximum size of the decrypted or encrypted backup."""
|
"""Return the maximum size of the decrypted or encrypted backup."""
|
||||||
@ -508,7 +555,15 @@ class _CipherBackupStreamer:
|
|||||||
writer = AsyncIteratorWriter(self._hass)
|
writer = AsyncIteratorWriter(self._hass)
|
||||||
worker = threading.Thread(
|
worker = threading.Thread(
|
||||||
target=self._cipher_func,
|
target=self._cipher_func,
|
||||||
args=[reader, writer, self._password, on_done, self.size(), self._nonces],
|
args=[
|
||||||
|
self._backup,
|
||||||
|
reader,
|
||||||
|
writer,
|
||||||
|
self._password,
|
||||||
|
on_done,
|
||||||
|
self.size(),
|
||||||
|
self._nonces,
|
||||||
|
],
|
||||||
)
|
)
|
||||||
worker_status = _CipherWorkerStatus(
|
worker_status = _CipherWorkerStatus(
|
||||||
done=asyncio.Event(), reader=reader, thread=worker, writer=writer
|
done=asyncio.Event(), reader=reader, thread=worker, writer=writer
|
||||||
@ -538,17 +593,6 @@ class DecryptedBackupStreamer(_CipherBackupStreamer):
|
|||||||
class EncryptedBackupStreamer(_CipherBackupStreamer):
|
class EncryptedBackupStreamer(_CipherBackupStreamer):
|
||||||
"""Encrypt a backup."""
|
"""Encrypt a backup."""
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
hass: HomeAssistant,
|
|
||||||
backup: AgentBackup,
|
|
||||||
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
|
|
||||||
password: str | None,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize."""
|
|
||||||
super().__init__(hass, backup, open_stream, password)
|
|
||||||
self._nonces = [os.urandom(16) for _ in range(self._num_tar_files())]
|
|
||||||
|
|
||||||
_cipher_func = staticmethod(encrypt_backup)
|
_cipher_func = staticmethod(encrypt_backup)
|
||||||
|
|
||||||
def backup(self) -> AgentBackup:
|
def backup(self) -> AgentBackup:
|
||||||
|
Binary file not shown.
Binary file not shown.
@ -177,7 +177,7 @@ async def _test_downloading_encrypted_backup(
|
|||||||
enc_metadata = json.loads(outer_tar.extractfile("./backup.json").read())
|
enc_metadata = json.loads(outer_tar.extractfile("./backup.json").read())
|
||||||
assert enc_metadata["protected"] is True
|
assert enc_metadata["protected"] is True
|
||||||
with (
|
with (
|
||||||
outer_tar.extractfile("core.tar.gz") as inner_tar_file,
|
outer_tar.extractfile("homeassistant.tar.gz") as inner_tar_file,
|
||||||
pytest.raises(tarfile.ReadError, match="file could not be opened"),
|
pytest.raises(tarfile.ReadError, match="file could not be opened"),
|
||||||
):
|
):
|
||||||
# pylint: disable-next=consider-using-with
|
# pylint: disable-next=consider-using-with
|
||||||
@ -209,7 +209,7 @@ async def _test_downloading_encrypted_backup(
|
|||||||
dec_metadata = json.loads(outer_tar.extractfile("./backup.json").read())
|
dec_metadata = json.loads(outer_tar.extractfile("./backup.json").read())
|
||||||
assert dec_metadata == enc_metadata | {"protected": False}
|
assert dec_metadata == enc_metadata | {"protected": False}
|
||||||
with (
|
with (
|
||||||
outer_tar.extractfile("core.tar.gz") as inner_tar_file,
|
outer_tar.extractfile("homeassistant.tar.gz") as inner_tar_file,
|
||||||
tarfile.open(fileobj=inner_tar_file, mode="r") as inner_tar,
|
tarfile.open(fileobj=inner_tar_file, mode="r") as inner_tar,
|
||||||
):
|
):
|
||||||
assert inner_tar.getnames() == [
|
assert inner_tar.getnames() == [
|
||||||
|
@ -174,7 +174,10 @@ async def test_decrypted_backup_streamer(hass: HomeAssistant) -> None:
|
|||||||
)
|
)
|
||||||
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -218,7 +221,10 @@ async def test_decrypted_backup_streamer_interrupt_stuck_reader(
|
|||||||
"""Test the decrypted backup streamer."""
|
"""Test the decrypted backup streamer."""
|
||||||
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -253,7 +259,10 @@ async def test_decrypted_backup_streamer_interrupt_stuck_writer(
|
|||||||
"""Test the decrypted backup streamer."""
|
"""Test the decrypted backup streamer."""
|
||||||
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -283,7 +292,10 @@ async def test_decrypted_backup_streamer_wrong_password(hass: HomeAssistant) ->
|
|||||||
"""Test the decrypted backup streamer with wrong password."""
|
"""Test the decrypted backup streamer with wrong password."""
|
||||||
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -320,7 +332,10 @@ async def test_encrypted_backup_streamer(hass: HomeAssistant) -> None:
|
|||||||
)
|
)
|
||||||
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -353,15 +368,16 @@ async def test_encrypted_backup_streamer(hass: HomeAssistant) -> None:
|
|||||||
bytes.fromhex("00000000000000000000000000000000"),
|
bytes.fromhex("00000000000000000000000000000000"),
|
||||||
)
|
)
|
||||||
encryptor = EncryptedBackupStreamer(hass, backup, open_backup, "hunter2")
|
encryptor = EncryptedBackupStreamer(hass, backup, open_backup, "hunter2")
|
||||||
assert encryptor.backup() == dataclasses.replace(
|
|
||||||
backup, protected=True, size=backup.size + len(expected_padding)
|
|
||||||
)
|
|
||||||
|
|
||||||
encrypted_stream = await encryptor.open_stream()
|
assert encryptor.backup() == dataclasses.replace(
|
||||||
encrypted_output = b""
|
backup, protected=True, size=backup.size + len(expected_padding)
|
||||||
async for chunk in encrypted_stream:
|
)
|
||||||
encrypted_output += chunk
|
|
||||||
await encryptor.wait()
|
encrypted_stream = await encryptor.open_stream()
|
||||||
|
encrypted_output = b""
|
||||||
|
async for chunk in encrypted_stream:
|
||||||
|
encrypted_output += chunk
|
||||||
|
await encryptor.wait()
|
||||||
|
|
||||||
# Expect the output to match the stored encrypted backup file, with additional
|
# Expect the output to match the stored encrypted backup file, with additional
|
||||||
# padding.
|
# padding.
|
||||||
@ -377,7 +393,10 @@ async def test_encrypted_backup_streamer_interrupt_stuck_reader(
|
|||||||
"test_backups/c0cb53bd.tar.decrypted", DOMAIN
|
"test_backups/c0cb53bd.tar.decrypted", DOMAIN
|
||||||
)
|
)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -414,7 +433,10 @@ async def test_encrypted_backup_streamer_interrupt_stuck_writer(
|
|||||||
"test_backups/c0cb53bd.tar.decrypted", DOMAIN
|
"test_backups/c0cb53bd.tar.decrypted", DOMAIN
|
||||||
)
|
)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -447,7 +469,10 @@ async def test_encrypted_backup_streamer_random_nonce(hass: HomeAssistant) -> No
|
|||||||
)
|
)
|
||||||
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
encrypted_backup_path = get_fixture_path("test_backups/c0cb53bd.tar", DOMAIN)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
@ -490,7 +515,7 @@ async def test_encrypted_backup_streamer_random_nonce(hass: HomeAssistant) -> No
|
|||||||
await encryptor1.wait()
|
await encryptor1.wait()
|
||||||
await encryptor2.wait()
|
await encryptor2.wait()
|
||||||
|
|
||||||
# Output from the two streames should differ but have the same length.
|
# Output from the two streams should differ but have the same length.
|
||||||
assert encrypted_output1 != encrypted_output3
|
assert encrypted_output1 != encrypted_output3
|
||||||
assert len(encrypted_output1) == len(encrypted_output3)
|
assert len(encrypted_output1) == len(encrypted_output3)
|
||||||
|
|
||||||
@ -508,7 +533,10 @@ async def test_encrypted_backup_streamer_error(hass: HomeAssistant) -> None:
|
|||||||
"test_backups/c0cb53bd.tar.decrypted", DOMAIN
|
"test_backups/c0cb53bd.tar.decrypted", DOMAIN
|
||||||
)
|
)
|
||||||
backup = AgentBackup(
|
backup = AgentBackup(
|
||||||
addons=["addon_1", "addon_2"],
|
addons=[
|
||||||
|
AddonInfo(name="Core 1", slug="core1", version="1.0.0"),
|
||||||
|
AddonInfo(name="Core 2", slug="core2", version="1.0.0"),
|
||||||
|
],
|
||||||
backup_id="1234",
|
backup_id="1234",
|
||||||
date="2024-12-02T07:23:58.261875-05:00",
|
date="2024-12-02T07:23:58.261875-05:00",
|
||||||
database_included=False,
|
database_included=False,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user