Add snapshot_exclude option (#1337)

* Add snapshot tar filter

* Add filter to add-on

* Fix bug

* Fix
This commit is contained in:
Pascal Vizeli 2019-10-21 14:48:24 +02:00 committed by GitHub
parent 05c8022db3
commit 11811701d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 11 deletions

View File

@ -51,7 +51,7 @@ from ..exceptions import (
)
from ..utils.apparmor import adjust_profile
from ..utils.json import read_json_file, write_json_file
from ..utils.tar import secure_path
from ..utils.tar import exclude_filter, secure_path
from .model import AddonModel, Data
from .utils import remove_data
from .validate import SCHEMA_ADDON_SNAPSHOT, validate_options
@ -526,7 +526,7 @@ class Addon(AddonModel):
async def snapshot(self, tar_file: tarfile.TarFile) -> None:
"""Snapshot state of an add-on."""
with TemporaryDirectory(dir=str(self.sys_config.path_tmp)) as temp:
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
# store local image
if self.need_build:
try:
@ -561,8 +561,15 @@ class Addon(AddonModel):
def _write_tarfile():
"""Write tar inside loop."""
with tar_file as snapshot:
# Snapshot system
snapshot.add(temp, arcname=".")
snapshot.add(self.path_data, arcname="data")
# Snapshot data
snapshot.add(
self.path_data,
arcname="data",
filter=exclude_filter(self.snapshot_exclude),
)
try:
_LOGGER.info("Build snapshot for add-on %s", self.slug)
@ -575,12 +582,12 @@ class Addon(AddonModel):
async def restore(self, tar_file: tarfile.TarFile) -> None:
"""Restore state of an add-on."""
with TemporaryDirectory(dir=str(self.sys_config.path_tmp)) as temp:
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
# extract snapshot
def _extract_tarfile():
"""Extract tar snapshot."""
with tar_file as snapshot:
snapshot.extractall(path=Path(temp), member=secure_path(snapshot))
snapshot.extractall(path=Path(temp), members=secure_path(snapshot))
try:
await self.sys_run_in_executor(_extract_tarfile)
@ -641,7 +648,7 @@ class Addon(AddonModel):
# Restore data
def _restore_data():
"""Restore data."""
shutil.copytree(str(Path(temp, "data")), str(self.path_data))
shutil.copytree(Path(temp, "data"), self.path_data)
_LOGGER.info("Restore data for addon %s", self.slug)
if self.path_data.is_dir():

View File

@ -47,6 +47,7 @@ from ..const import (
ATTR_SCHEMA,
ATTR_SERVICES,
ATTR_SLUG,
ATTR_SNAPSHOT_EXCLUDE,
ATTR_STARTUP,
ATTR_STDIN,
ATTR_TIMEOUT,
@ -324,6 +325,11 @@ class AddonModel(CoreSysAttributes):
"""Return Hass.io role for API."""
return self.data[ATTR_HASSIO_ROLE]
@property
def snapshot_exclude(self) -> List[str]:
"""Return Exclude list for snapshot."""
return self.data.get(ATTR_SNAPSHOT_EXCLUDE, [])
@property
def with_stdin(self) -> bool:
"""Return True if the add-on access use stdin input."""

View File

@ -62,6 +62,7 @@ from ..const import (
ATTR_SCHEMA,
ATTR_SERVICES,
ATTR_SLUG,
ATTR_SNAPSHOT_EXCLUDE,
ATTR_SQUASH,
ATTR_STARTUP,
ATTR_STATE,
@ -214,6 +215,7 @@ SCHEMA_ADDON_CONFIG = vol.Schema(
vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(),
vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)],
vol.Optional(ATTR_DISCOVERY): [valid_discovery_service],
vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [vol.Coerce(str)],
vol.Required(ATTR_OPTIONS): dict,
vol.Required(ATTR_SCHEMA): vol.Any(
vol.Schema(

View File

@ -221,6 +221,7 @@ ATTR_SERVERS = "servers"
ATTR_LOCALS = "locals"
ATTR_UDEV = "udev"
ATTR_VALUE = "value"
ATTR_SNAPSHOT_EXCLUDE = "snapshot_exclude"
PROVIDE_SERVICE = "provide"
NEED_SERVICE = "need"

View File

@ -42,7 +42,7 @@ def remove_folder(folder):
for obj in folder.iterdir():
try:
if obj.is_dir():
shutil.rmtree(str(obj), ignore_errors=True)
shutil.rmtree(obj, ignore_errors=True)
else:
obj.unlink()
except (OSError, shutil.Error):

View File

@ -137,7 +137,7 @@ class GitRepo(CoreSysAttributes):
"""Log error."""
_LOGGER.warning("Can't remove %s", path)
shutil.rmtree(str(self.path), onerror=log_err)
shutil.rmtree(self.path, onerror=log_err)
class GitRepoHassIO(GitRepo):

View File

@ -2,9 +2,9 @@
import hashlib
import logging
import os
import tarfile
from pathlib import Path
from typing import IO, Generator, Optional
import tarfile
from typing import IO, Callable, Generator, List, Optional
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
@ -131,3 +131,22 @@ def secure_path(tar: tarfile.TarFile) -> Generator[tarfile.TarInfo, None, None]:
continue
else:
yield member
def exclude_filter(
exclude_list: List[str]
) -> Callable[[tarfile.TarInfo], Optional[tarfile.TarInfo]]:
"""Create callable filter function to check TarInfo for add."""
def my_filter(tar: tarfile.TarInfo) -> Optional[tarfile.TarInfo]:
"""Custom exclude filter."""
file_path = Path(tar.name)
for exclude in exclude_list:
if not file_path.match(exclude):
continue
_LOGGER.debug("Ignore %s because of %s", file_path, exclude)
return None
return tar
return my_filter

View File

@ -1,8 +1,9 @@
"""Test Tarfile functions."""
import attr
import pytest
from hassio.utils.tar import secure_path
from hassio.utils.tar import secure_path, exclude_filter
@attr.s
@ -31,3 +32,30 @@ def test_not_secure_path():
TarInfo("/bla/blu/ble"),
]
assert [] == list(secure_path(test_list))
def test_exclude_filter_good():
"""Test exclude filter."""
filter_funct = exclude_filter(["not/match", "/dev/xy"])
test_list = [
TarInfo("test.txt"),
TarInfo("data/xy.blob"),
TarInfo("bla/blu/ble"),
TarInfo("data/../xy.blob"),
]
assert test_list == [filter_funct(result) for result in test_list]
def test_exclude_filter_bad():
"""Test exclude filter."""
filter_funct = exclude_filter(["*.txt", "data/*", "bla/blu/ble"])
test_list = [
TarInfo("test.txt"),
TarInfo("data/xy.blob"),
TarInfo("bla/blu/ble"),
TarInfo("data/test_files/kk.txt"),
]
for info in [filter_funct(result) for result in test_list]:
assert info is None