mirror of
https://github.com/home-assistant/supervisor.git
synced 2026-02-18 08:17:35 +00:00
Compare commits
3 Commits
handle-con
...
backup-use
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64f7d57b38 | ||
|
|
1efd591aca | ||
|
|
523a267524 |
@@ -20,7 +20,7 @@ from typing import Any, Final, cast
|
||||
import aiohttp
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from deepmerge import Merger
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -1444,10 +1444,11 @@ class Addon(AddonModel):
|
||||
tmp = TemporaryDirectory(dir=self.sys_config.path_tmp)
|
||||
try:
|
||||
with tar_file as backup:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
backup.extractall(
|
||||
path=tmp.name,
|
||||
members=secure_path(backup),
|
||||
filter="fully_trusted",
|
||||
filter="tar",
|
||||
)
|
||||
|
||||
data = read_json_file(Path(tmp.name, "addon.json"))
|
||||
|
||||
@@ -18,7 +18,7 @@ import time
|
||||
from typing import Any, Self, cast
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -513,10 +513,11 @@ class Backup(JobGroup):
|
||||
tmp = TemporaryDirectory(dir=str(backup_tarfile.parent))
|
||||
|
||||
with tarfile.open(backup_tarfile, "r:") as tar:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
tar.extractall(
|
||||
path=tmp.name,
|
||||
members=secure_path(tar),
|
||||
filter="fully_trusted",
|
||||
filter="tar",
|
||||
)
|
||||
|
||||
return tmp
|
||||
@@ -798,8 +799,11 @@ class Backup(JobGroup):
|
||||
bufsize=BUF_SIZE,
|
||||
password=self._password,
|
||||
) as tar_file:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
tar_file.extractall(
|
||||
path=origin_dir, members=tar_file, filter="fully_trusted"
|
||||
path=origin_dir,
|
||||
filter="tar",
|
||||
)
|
||||
_LOGGER.info("Restore folder %s done", name)
|
||||
except (tarfile.TarError, OSError) as err:
|
||||
|
||||
@@ -13,7 +13,7 @@ from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from awesomeversion import AwesomeVersion, AwesomeVersionException
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add, secure_path
|
||||
from securetar import AddFileError, SecureTarFile, atomic_contents_add
|
||||
import voluptuous as vol
|
||||
from voluptuous.humanize import humanize_error
|
||||
|
||||
@@ -495,10 +495,11 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
|
||||
# extract backup
|
||||
try:
|
||||
with tar_file as backup:
|
||||
# The tar filter rejects path traversal and absolute names,
|
||||
# aborting restore of potentially crafted backups.
|
||||
backup.extractall(
|
||||
path=temp_path,
|
||||
members=secure_path(backup),
|
||||
filter="fully_trusted",
|
||||
filter="tar",
|
||||
)
|
||||
except tarfile.TarError as err:
|
||||
raise HomeAssistantError(
|
||||
|
||||
139
tests/backups/test_backup_security.py
Normal file
139
tests/backups/test_backup_security.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Security tests for backup tar extraction with tar filter."""
|
||||
|
||||
import io
|
||||
from pathlib import Path
|
||||
import tarfile
|
||||
|
||||
|
||||
def _create_tar_gz(
|
||||
path: Path,
|
||||
members: list[tarfile.TarInfo],
|
||||
file_data: dict[str, bytes] | None = None,
|
||||
) -> None:
|
||||
"""Create a tar.gz file with specified members."""
|
||||
if file_data is None:
|
||||
file_data = {}
|
||||
with tarfile.open(path, "w:gz") as tar:
|
||||
for info in members:
|
||||
data = file_data.get(info.name)
|
||||
if data is not None:
|
||||
tar.addfile(info, io.BytesIO(data))
|
||||
else:
|
||||
tar.addfile(info)
|
||||
|
||||
|
||||
def test_path_traversal_rejected(tmp_path: Path):
|
||||
"""Test that path traversal in member names is rejected."""
|
||||
traversal_info = tarfile.TarInfo(name="../../etc/passwd")
|
||||
traversal_info.size = 9
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [traversal_info], {"../../etc/passwd": b"malicious"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
try:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
except tarfile.OutsideDestinationError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Expected OutsideDestinationError")
|
||||
|
||||
|
||||
def test_symlink_write_through_rejected(tmp_path: Path):
|
||||
"""Test that writing through a symlink to outside destination is rejected.
|
||||
|
||||
The tar filter's realpath check follows already-extracted symlinks on disk,
|
||||
catching write-through attacks even without explicit link target validation.
|
||||
"""
|
||||
# Symlink pointing outside, then a file entry writing through it
|
||||
link_info = tarfile.TarInfo(name="escape")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "../outside"
|
||||
file_info = tarfile.TarInfo(name="escape/evil.py")
|
||||
file_info.size = 9
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[link_info, file_info],
|
||||
{"escape/evil.py": b"malicious"},
|
||||
)
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
try:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
except tarfile.OutsideDestinationError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Expected OutsideDestinationError")
|
||||
|
||||
# The evil file must not exist outside the destination
|
||||
assert not (tmp_path / "outside" / "evil.py").exists()
|
||||
|
||||
|
||||
def test_absolute_name_stripped_and_extracted(tmp_path: Path):
|
||||
"""Test that absolute member names have leading / stripped and extract safely."""
|
||||
info = tarfile.TarInfo(name="/etc/test.conf")
|
||||
info.size = 5
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [info], {"/etc/test.conf": b"hello"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
# Extracted inside destination with leading / stripped
|
||||
assert (dest / "etc" / "test.conf").read_text() == "hello"
|
||||
|
||||
|
||||
def test_valid_backup_with_internal_symlinks(tmp_path: Path):
|
||||
"""Test that valid backups with internal relative symlinks extract correctly."""
|
||||
dir_info = tarfile.TarInfo(name="subdir")
|
||||
dir_info.type = tarfile.DIRTYPE
|
||||
dir_info.mode = 0o755
|
||||
|
||||
file_info = tarfile.TarInfo(name="subdir/config.yaml")
|
||||
file_info.size = 11
|
||||
|
||||
link_info = tarfile.TarInfo(name="config_link")
|
||||
link_info.type = tarfile.SYMTYPE
|
||||
link_info.linkname = "subdir/config.yaml"
|
||||
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(
|
||||
tar_path,
|
||||
[dir_info, file_info, link_info],
|
||||
{"subdir/config.yaml": b"key: value\n"},
|
||||
)
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
tar.extractall(path=dest, filter="tar")
|
||||
|
||||
assert (dest / "subdir" / "config.yaml").read_text() == "key: value\n"
|
||||
assert (dest / "config_link").is_symlink()
|
||||
assert (dest / "config_link").read_text() == "key: value\n"
|
||||
|
||||
|
||||
def test_uid_gid_preserved(tmp_path: Path):
|
||||
"""Test that tar filter preserves file ownership."""
|
||||
info = tarfile.TarInfo(name="owned_file.txt")
|
||||
info.size = 5
|
||||
info.uid = 1000
|
||||
info.gid = 1000
|
||||
tar_path = tmp_path / "test.tar.gz"
|
||||
_create_tar_gz(tar_path, [info], {"owned_file.txt": b"hello"})
|
||||
|
||||
dest = tmp_path / "out"
|
||||
dest.mkdir()
|
||||
with tarfile.open(tar_path, "r:gz") as tar:
|
||||
# Extract member via filter only (don't actually extract, just check
|
||||
# the filter preserves uid/gid)
|
||||
for member in tar:
|
||||
filtered = tarfile.tar_filter(member, str(dest))
|
||||
assert filtered.uid == 1000
|
||||
assert filtered.gid == 1000
|
||||
Reference in New Issue
Block a user