diff --git a/hassio/addons/addon.py b/hassio/addons/addon.py index e47c593d4..50e6f9eac 100644 --- a/hassio/addons/addon.py +++ b/hassio/addons/addon.py @@ -51,7 +51,7 @@ from ..exceptions import ( ) from ..utils.apparmor import adjust_profile from ..utils.json import read_json_file, write_json_file -from ..utils.tar import secure_path +from ..utils.tar import exclude_filter, secure_path from .model import AddonModel, Data from .utils import remove_data from .validate import SCHEMA_ADDON_SNAPSHOT, validate_options @@ -526,7 +526,7 @@ class Addon(AddonModel): async def snapshot(self, tar_file: tarfile.TarFile) -> None: """Snapshot state of an add-on.""" - with TemporaryDirectory(dir=str(self.sys_config.path_tmp)) as temp: + with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: # store local image if self.need_build: try: @@ -561,8 +561,15 @@ class Addon(AddonModel): def _write_tarfile(): """Write tar inside loop.""" with tar_file as snapshot: + # Snapshot system snapshot.add(temp, arcname=".") - snapshot.add(self.path_data, arcname="data") + + # Snapshot data + snapshot.add( + self.path_data, + arcname="data", + filter=exclude_filter(self.snapshot_exclude), + ) try: _LOGGER.info("Build snapshot for add-on %s", self.slug) @@ -575,12 +582,12 @@ class Addon(AddonModel): async def restore(self, tar_file: tarfile.TarFile) -> None: """Restore state of an add-on.""" - with TemporaryDirectory(dir=str(self.sys_config.path_tmp)) as temp: + with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp: # extract snapshot def _extract_tarfile(): """Extract tar snapshot.""" with tar_file as snapshot: - snapshot.extractall(path=Path(temp), member=secure_path(snapshot)) + snapshot.extractall(path=Path(temp), members=secure_path(snapshot)) try: await self.sys_run_in_executor(_extract_tarfile) @@ -641,7 +648,7 @@ class Addon(AddonModel): # Restore data def _restore_data(): """Restore data.""" - shutil.copytree(str(Path(temp, "data")), str(self.path_data)) + shutil.copytree(Path(temp, "data"), self.path_data) _LOGGER.info("Restore data for addon %s", self.slug) if self.path_data.is_dir(): diff --git a/hassio/addons/model.py b/hassio/addons/model.py index ffdada1c9..6cc799b44 100644 --- a/hassio/addons/model.py +++ b/hassio/addons/model.py @@ -47,6 +47,7 @@ from ..const import ( ATTR_SCHEMA, ATTR_SERVICES, ATTR_SLUG, + ATTR_SNAPSHOT_EXCLUDE, ATTR_STARTUP, ATTR_STDIN, ATTR_TIMEOUT, @@ -324,6 +325,11 @@ class AddonModel(CoreSysAttributes): """Return Hass.io role for API.""" return self.data[ATTR_HASSIO_ROLE] + @property + def snapshot_exclude(self) -> List[str]: + """Return Exclude list for snapshot.""" + return self.data.get(ATTR_SNAPSHOT_EXCLUDE, []) + @property def with_stdin(self) -> bool: """Return True if the add-on access use stdin input.""" diff --git a/hassio/addons/validate.py b/hassio/addons/validate.py index 87b695887..d47c9e2d9 100644 --- a/hassio/addons/validate.py +++ b/hassio/addons/validate.py @@ -62,6 +62,7 @@ from ..const import ( ATTR_SCHEMA, ATTR_SERVICES, ATTR_SLUG, + ATTR_SNAPSHOT_EXCLUDE, ATTR_SQUASH, ATTR_STARTUP, ATTR_STATE, @@ -214,6 +215,7 @@ SCHEMA_ADDON_CONFIG = vol.Schema( vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(), vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)], vol.Optional(ATTR_DISCOVERY): [valid_discovery_service], + vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [vol.Coerce(str)], vol.Required(ATTR_OPTIONS): dict, vol.Required(ATTR_SCHEMA): vol.Any( vol.Schema( diff --git a/hassio/const.py b/hassio/const.py index 223534629..6194f771b 100644 --- a/hassio/const.py +++ b/hassio/const.py @@ -221,6 +221,7 @@ ATTR_SERVERS = "servers" ATTR_LOCALS = "locals" ATTR_UDEV = "udev" ATTR_VALUE = "value" +ATTR_SNAPSHOT_EXCLUDE = "snapshot_exclude" PROVIDE_SERVICE = "provide" NEED_SERVICE = "need" diff --git a/hassio/snapshots/utils.py b/hassio/snapshots/utils.py index 2f9d73bce..a2403a482 100644 --- a/hassio/snapshots/utils.py +++ b/hassio/snapshots/utils.py @@ -42,7 +42,7 @@ def remove_folder(folder): for obj in folder.iterdir(): try: if obj.is_dir(): - shutil.rmtree(str(obj), ignore_errors=True) + shutil.rmtree(obj, ignore_errors=True) else: obj.unlink() except (OSError, shutil.Error): diff --git a/hassio/store/git.py b/hassio/store/git.py index 7bff4f6ab..a60799230 100644 --- a/hassio/store/git.py +++ b/hassio/store/git.py @@ -137,7 +137,7 @@ class GitRepo(CoreSysAttributes): """Log error.""" _LOGGER.warning("Can't remove %s", path) - shutil.rmtree(str(self.path), onerror=log_err) + shutil.rmtree(self.path, onerror=log_err) class GitRepoHassIO(GitRepo): diff --git a/hassio/utils/tar.py b/hassio/utils/tar.py index 9d5be5741..27f12fa94 100644 --- a/hassio/utils/tar.py +++ b/hassio/utils/tar.py @@ -2,9 +2,9 @@ import hashlib import logging import os -import tarfile from pathlib import Path -from typing import IO, Generator, Optional +import tarfile +from typing import IO, Callable, Generator, List, Optional from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding @@ -131,3 +131,22 @@ def secure_path(tar: tarfile.TarFile) -> Generator[tarfile.TarInfo, None, None]: continue else: yield member + + +def exclude_filter( + exclude_list: List[str] +) -> Callable[[tarfile.TarInfo], Optional[tarfile.TarInfo]]: + """Create callable filter function to check TarInfo for add.""" + + def my_filter(tar: tarfile.TarInfo) -> Optional[tarfile.TarInfo]: + """Custom exclude filter.""" + file_path = Path(tar.name) + for exclude in exclude_list: + if not file_path.match(exclude): + continue + _LOGGER.debug("Ignore %s because of %s", file_path, exclude) + return None + + return tar + + return my_filter diff --git a/tests/utils/test_tarfile.py b/tests/utils/test_tarfile.py index ed61871ea..e0a5e31e7 100644 --- a/tests/utils/test_tarfile.py +++ b/tests/utils/test_tarfile.py @@ -1,8 +1,9 @@ """Test Tarfile functions.""" import attr +import pytest -from hassio.utils.tar import secure_path +from hassio.utils.tar import secure_path, exclude_filter @attr.s @@ -31,3 +32,30 @@ def test_not_secure_path(): TarInfo("/bla/blu/ble"), ] assert [] == list(secure_path(test_list)) + + +def test_exclude_filter_good(): + """Test exclude filter.""" + filter_funct = exclude_filter(["not/match", "/dev/xy"]) + test_list = [ + TarInfo("test.txt"), + TarInfo("data/xy.blob"), + TarInfo("bla/blu/ble"), + TarInfo("data/../xy.blob"), + ] + + assert test_list == [filter_funct(result) for result in test_list] + + +def test_exclude_filter_bad(): + """Test exclude filter.""" + filter_funct = exclude_filter(["*.txt", "data/*", "bla/blu/ble"]) + test_list = [ + TarInfo("test.txt"), + TarInfo("data/xy.blob"), + TarInfo("bla/blu/ble"), + TarInfo("data/test_files/kk.txt"), + ] + + for info in [filter_funct(result) for result in test_list]: + assert info is None