mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-08 09:46:29 +00:00
Check path on extractall (#1336)
* Check path on extractall * code cleanup * Add logger * Fix issue * Add tests
This commit is contained in:
parent
a9ebb147c5
commit
05c8022db3
@ -51,6 +51,7 @@ from ..exceptions import (
|
|||||||
)
|
)
|
||||||
from ..utils.apparmor import adjust_profile
|
from ..utils.apparmor import adjust_profile
|
||||||
from ..utils.json import read_json_file, write_json_file
|
from ..utils.json import read_json_file, write_json_file
|
||||||
|
from ..utils.tar import secure_path
|
||||||
from .model import AddonModel, Data
|
from .model import AddonModel, Data
|
||||||
from .utils import remove_data
|
from .utils import remove_data
|
||||||
from .validate import SCHEMA_ADDON_SNAPSHOT, validate_options
|
from .validate import SCHEMA_ADDON_SNAPSHOT, validate_options
|
||||||
@ -579,7 +580,7 @@ class Addon(AddonModel):
|
|||||||
def _extract_tarfile():
|
def _extract_tarfile():
|
||||||
"""Extract tar snapshot."""
|
"""Extract tar snapshot."""
|
||||||
with tar_file as snapshot:
|
with tar_file as snapshot:
|
||||||
snapshot.extractall(path=Path(temp))
|
snapshot.extractall(path=Path(temp), member=secure_path(snapshot))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.sys_run_in_executor(_extract_tarfile)
|
await self.sys_run_in_executor(_extract_tarfile)
|
||||||
|
@ -41,7 +41,7 @@ from ..const import (
|
|||||||
from ..coresys import CoreSys, CoreSysAttributes
|
from ..coresys import CoreSys, CoreSysAttributes
|
||||||
from ..exceptions import AddonsError
|
from ..exceptions import AddonsError
|
||||||
from ..utils.json import write_json_file
|
from ..utils.json import write_json_file
|
||||||
from ..utils.tar import SecureTarFile
|
from ..utils.tar import SecureTarFile, secure_path
|
||||||
from .utils import key_to_iv, password_for_validating, password_to_key, remove_folder
|
from .utils import key_to_iv, password_for_validating, password_to_key, remove_folder
|
||||||
from .validate import ALL_FOLDERS, SCHEMA_SNAPSHOT
|
from .validate import ALL_FOLDERS, SCHEMA_SNAPSHOT
|
||||||
|
|
||||||
@ -248,7 +248,7 @@ class Snapshot(CoreSysAttributes):
|
|||||||
def _extract_snapshot():
|
def _extract_snapshot():
|
||||||
"""Extract a snapshot."""
|
"""Extract a snapshot."""
|
||||||
with tarfile.open(self.tarfile, "r:") as tar:
|
with tarfile.open(self.tarfile, "r:") as tar:
|
||||||
tar.extractall(path=self._tmp.name)
|
tar.extractall(path=self._tmp.name, members=secure_path(tar))
|
||||||
|
|
||||||
await self.sys_run_in_executor(_extract_snapshot)
|
await self.sys_run_in_executor(_extract_snapshot)
|
||||||
|
|
||||||
@ -396,7 +396,7 @@ class Snapshot(CoreSysAttributes):
|
|||||||
try:
|
try:
|
||||||
_LOGGER.info("Restore folder %s", name)
|
_LOGGER.info("Restore folder %s", name)
|
||||||
with SecureTarFile(tar_name, "r", key=self._key) as tar_file:
|
with SecureTarFile(tar_name, "r", key=self._key) as tar_file:
|
||||||
tar_file.extractall(path=origin_dir)
|
tar_file.extractall(path=origin_dir, members=tar_file)
|
||||||
_LOGGER.info("Restore folder %s done", name)
|
_LOGGER.info("Restore folder %s done", name)
|
||||||
except (tarfile.TarError, OSError) as err:
|
except (tarfile.TarError, OSError) as err:
|
||||||
_LOGGER.warning("Can't restore folder %s: %s", name, err)
|
_LOGGER.warning("Can't restore folder %s: %s", name, err)
|
||||||
|
@ -1,19 +1,22 @@
|
|||||||
"""Tarfile fileobject handler for encrypted files."""
|
"""Tarfile fileobject handler for encrypted files."""
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
|
||||||
import tarfile
|
import tarfile
|
||||||
from typing import IO, Optional
|
from pathlib import Path
|
||||||
|
from typing import IO, Generator, Optional
|
||||||
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
from cryptography.hazmat.primitives import padding
|
from cryptography.hazmat.primitives import padding
|
||||||
from cryptography.hazmat.primitives.ciphers import (
|
from cryptography.hazmat.primitives.ciphers import (
|
||||||
CipherContext,
|
|
||||||
Cipher,
|
Cipher,
|
||||||
|
CipherContext,
|
||||||
algorithms,
|
algorithms,
|
||||||
modes,
|
modes,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
BLOCK_SIZE = 16
|
BLOCK_SIZE = 16
|
||||||
BLOCK_SIZE_BITS = 128
|
BLOCK_SIZE_BITS = 128
|
||||||
|
|
||||||
@ -111,3 +114,20 @@ def _generate_iv(key: bytes, salt: bytes) -> bytes:
|
|||||||
for _ in range(100):
|
for _ in range(100):
|
||||||
temp_iv = hashlib.sha256(temp_iv).digest()
|
temp_iv = hashlib.sha256(temp_iv).digest()
|
||||||
return temp_iv[:16]
|
return temp_iv[:16]
|
||||||
|
|
||||||
|
|
||||||
|
def secure_path(tar: tarfile.TarFile) -> Generator[tarfile.TarInfo, None, None]:
|
||||||
|
"""Security safe check of path.
|
||||||
|
|
||||||
|
Prevent ../ or absolut paths
|
||||||
|
"""
|
||||||
|
for member in tar:
|
||||||
|
file_path = Path(member.name)
|
||||||
|
try:
|
||||||
|
assert not file_path.is_absolute()
|
||||||
|
Path("/fake", file_path).resolve().relative_to("/fake")
|
||||||
|
except (ValueError, RuntimeError, AssertionError):
|
||||||
|
_LOGGER.warning("Issue with file %s", file_path)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
yield member
|
||||||
|
33
tests/utils/test_tarfile.py
Normal file
33
tests/utils/test_tarfile.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
"""Test Tarfile functions."""
|
||||||
|
|
||||||
|
import attr
|
||||||
|
|
||||||
|
from hassio.utils.tar import secure_path
|
||||||
|
|
||||||
|
|
||||||
|
@attr.s
|
||||||
|
class TarInfo:
|
||||||
|
"""Fake TarInfo"""
|
||||||
|
|
||||||
|
name: str = attr.ib()
|
||||||
|
|
||||||
|
|
||||||
|
def test_secure_path():
|
||||||
|
"""Test Secure Path."""
|
||||||
|
test_list = [
|
||||||
|
TarInfo("test.txt"),
|
||||||
|
TarInfo("data/xy.blob"),
|
||||||
|
TarInfo("bla/blu/ble"),
|
||||||
|
TarInfo("data/../xy.blob"),
|
||||||
|
]
|
||||||
|
assert test_list == list(secure_path(test_list))
|
||||||
|
|
||||||
|
|
||||||
|
def test_not_secure_path():
|
||||||
|
"""Test Not secure path."""
|
||||||
|
test_list = [
|
||||||
|
TarInfo("/test.txt"),
|
||||||
|
TarInfo("data/../../xy.blob"),
|
||||||
|
TarInfo("/bla/blu/ble"),
|
||||||
|
]
|
||||||
|
assert [] == list(secure_path(test_list))
|
Loading…
x
Reference in New Issue
Block a user