New Backup format for core (#3451)

* New Backup format for core

* cleanup

* next round

* partial resotre encrypted function of metadata

* Using securetar as modul

* fix securetar imports

* simplify

* more typing

* adjust handling

* Handling replace better

* increase the order

* more logic cleanup

* create new core backup

* handle restore

* fix tests

* better checks

* Fix attribute issues

* Update supervisor/backups/manager.py

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

* Address comments

* Fix tests

* Update supervisor/exceptions.py

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2022-02-23 16:08:02 +01:00 committed by GitHub
parent 842c4b3864
commit 9104b287e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 274 additions and 591 deletions

View File

@ -18,6 +18,7 @@ jinja2==3.0.3
pulsectl==22.1.3 pulsectl==22.1.3
pyudev==0.23.2 pyudev==0.23.2
ruamel.yaml==0.17.17 ruamel.yaml==0.17.17
securetar==2022.2.0
sentry-sdk==1.5.6 sentry-sdk==1.5.6
voluptuous==0.12.2 voluptuous==0.12.2
dbus-next==0.2.3 dbus-next==0.2.3

View File

@ -14,6 +14,7 @@ from typing import Any, Awaitable, Final, Optional
import aiohttp import aiohttp
from deepmerge import Merger from deepmerge import Merger
from securetar import atomic_contents_add, secure_path
import voluptuous as vol import voluptuous as vol
from voluptuous.humanize import humanize_error from voluptuous.humanize import humanize_error
@ -65,7 +66,6 @@ from ..homeassistant.const import WSEvent, WSType
from ..utils import check_port from ..utils import check_port
from ..utils.apparmor import adjust_profile from ..utils.apparmor import adjust_profile
from ..utils.json import read_json_file, write_json_file from ..utils.json import read_json_file, write_json_file
from ..utils.tar import atomic_contents_add, secure_path
from .const import AddonBackupMode from .const import AddonBackupMode
from .model import AddonModel, Data from .model import AddonModel, Data
from .options import AddonOptions from .options import AddonOptions
@ -748,8 +748,7 @@ class Addon(AddonModel):
def _write_tarfile(): def _write_tarfile():
"""Write tar inside loop.""" """Write tar inside loop."""
with tar_file as backup: with tar_file as backup:
# Backup system # Backup metadata
backup.add(temp, arcname=".") backup.add(temp, arcname=".")
# Backup data # Backup data
@ -816,12 +815,10 @@ class Addon(AddonModel):
try: try:
data = SCHEMA_ADDON_BACKUP(data) data = SCHEMA_ADDON_BACKUP(data)
except vol.Invalid as err: except vol.Invalid as err:
_LOGGER.error( raise AddonsError(
"Can't validate %s, backup data: %s", f"Can't validate {self.slug}, backup data: {humanize_error(data, err)}",
self.slug, _LOGGER.error,
humanize_error(data, err), ) from err
)
raise AddonsError() from err
# If available # If available
if not self._available(data[ATTR_SYSTEM]): if not self._available(data[ATTR_SYSTEM]):

View File

@ -5,50 +5,43 @@ import logging
from pathlib import Path from pathlib import Path
import tarfile import tarfile
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any, Optional from typing import Any, Awaitable, Optional
from awesomeversion import AwesomeVersionCompareException
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from securetar import SecureTarFile, atomic_contents_add, secure_path
import voluptuous as vol import voluptuous as vol
from voluptuous.humanize import humanize_error from voluptuous.humanize import humanize_error
from ..addons import Addon from ..addons import Addon
from ..const import ( from ..const import (
ATTR_ADDONS, ATTR_ADDONS,
ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT,
ATTR_BOOT,
ATTR_COMPRESSED, ATTR_COMPRESSED,
ATTR_CRYPTO, ATTR_CRYPTO,
ATTR_DATE, ATTR_DATE,
ATTR_DOCKER, ATTR_DOCKER,
ATTR_FOLDERS, ATTR_FOLDERS,
ATTR_HOMEASSISTANT, ATTR_HOMEASSISTANT,
ATTR_IMAGE,
ATTR_NAME, ATTR_NAME,
ATTR_PASSWORD, ATTR_PASSWORD,
ATTR_PORT,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_REFRESH_TOKEN,
ATTR_REGISTRIES, ATTR_REGISTRIES,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
ATTR_SIZE, ATTR_SIZE,
ATTR_SLUG, ATTR_SLUG,
ATTR_SSL,
ATTR_TYPE, ATTR_TYPE,
ATTR_USERNAME, ATTR_USERNAME,
ATTR_VERSION, ATTR_VERSION,
ATTR_WAIT_BOOT,
ATTR_WATCHDOG,
CRYPTO_AES128, CRYPTO_AES128,
FOLDER_HOMEASSISTANT,
) )
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import AddonsError from ..exceptions import AddonsError, BackupError
from ..utils import remove_folder
from ..utils.json import write_json_file from ..utils.json import write_json_file
from ..utils.tar import SecureTarFile, atomic_contents_add, secure_path from .const import BackupType
from .utils import key_to_iv, password_for_validating, password_to_key, remove_folder from .utils import key_to_iv, password_to_key
from .validate import SCHEMA_BACKUP from .validate import SCHEMA_BACKUP
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -67,17 +60,22 @@ class Backup(CoreSysAttributes):
self._aes: Optional[Cipher] = None self._aes: Optional[Cipher] = None
@property @property
def slug(self): def version(self) -> int:
"""Return backup version."""
return self._data[ATTR_VERSION]
@property
def slug(self) -> str:
"""Return backup slug.""" """Return backup slug."""
return self._data.get(ATTR_SLUG) return self._data[ATTR_SLUG]
@property @property
def sys_type(self): def sys_type(self) -> BackupType:
"""Return backup type.""" """Return backup type."""
return self._data.get(ATTR_TYPE) return self._data[ATTR_TYPE]
@property @property
def name(self): def name(self) -> str:
"""Return backup name.""" """Return backup name."""
return self._data[ATTR_NAME] return self._data[ATTR_NAME]
@ -87,14 +85,14 @@ class Backup(CoreSysAttributes):
return self._data[ATTR_DATE] return self._data[ATTR_DATE]
@property @property
def protected(self): def protected(self) -> bool:
"""Return backup date.""" """Return backup date."""
return self._data.get(ATTR_PROTECTED) is not None return self._data[ATTR_PROTECTED]
@property @property
def compressed(self): def compressed(self) -> bool:
"""Return whether backup is compressed.""" """Return whether backup is compressed."""
return self._data.get(ATTR_COMPRESSED) return self._data[ATTR_COMPRESSED]
@property @property
def addons(self): def addons(self):
@ -161,6 +159,7 @@ class Backup(CoreSysAttributes):
def new(self, slug, name, date, sys_type, password=None, compressed=True): def new(self, slug, name, date, sys_type, password=None, compressed=True):
"""Initialize a new backup.""" """Initialize a new backup."""
# Init metadata # Init metadata
self._data[ATTR_VERSION] = 2
self._data[ATTR_SLUG] = slug self._data[ATTR_SLUG] = slug
self._data[ATTR_NAME] = name self._data[ATTR_NAME] = name
self._data[ATTR_DATE] = date self._data[ATTR_DATE] = date
@ -172,7 +171,7 @@ class Backup(CoreSysAttributes):
# Set password # Set password
if password: if password:
self._init_password(password) self._init_password(password)
self._data[ATTR_PROTECTED] = password_for_validating(password) self._data[ATTR_PROTECTED] = True
self._data[ATTR_CRYPTO] = CRYPTO_AES128 self._data[ATTR_CRYPTO] = CRYPTO_AES128
if not compressed: if not compressed:
@ -182,11 +181,6 @@ class Backup(CoreSysAttributes):
"""Set the password for an existing backup.""" """Set the password for an existing backup."""
if not password: if not password:
return False return False
validating = password_for_validating(password)
if validating != self._data[ATTR_PROTECTED]:
return False
self._init_password(password) self._init_password(password)
return True return True
@ -419,7 +413,7 @@ class Backup(CoreSysAttributes):
async def restore_folders(self, folder_list: list[str]): async def restore_folders(self, folder_list: list[str]):
"""Backup Supervisor data into backup.""" """Backup Supervisor data into backup."""
def _folder_restore(name: str): async def _folder_restore(name: str) -> None:
"""Intenal function to restore a folder.""" """Intenal function to restore a folder."""
slug_name = name.replace("/", "_") slug_name = name.replace("/", "_")
tar_name = Path( tar_name = Path(
@ -434,68 +428,33 @@ class Backup(CoreSysAttributes):
# Clean old stuff # Clean old stuff
if origin_dir.is_dir(): if origin_dir.is_dir():
remove_folder(origin_dir) await remove_folder(origin_dir, content_only=True)
# Perform a restore # Perform a restore
try: def _restore() -> None:
_LOGGER.info("Restore folder %s", name) try:
with SecureTarFile( _LOGGER.info("Restore folder %s", name)
tar_name, "r", key=self._key, gzip=self.compressed with SecureTarFile(
) as tar_file: tar_name, "r", key=self._key, gzip=self.compressed
tar_file.extractall(path=origin_dir, members=tar_file) ) as tar_file:
_LOGGER.info("Restore folder %s done", name) tar_file.extractall(path=origin_dir, members=tar_file)
except (tarfile.TarError, OSError) as err: _LOGGER.info("Restore folder %s done", name)
_LOGGER.warning("Can't restore folder %s: %s", name, err) except (tarfile.TarError, OSError) as err:
_LOGGER.warning("Can't restore folder %s: %s", name, err)
await self.sys_run_in_executor(_restore, name)
# Restore folder sequential # Restore folder sequential
# avoid issue on slow IO # avoid issue on slow IO
for folder in folder_list: for folder in folder_list:
try: try:
await self.sys_run_in_executor(_folder_restore, folder) await _folder_restore(folder)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
_LOGGER.warning("Can't restore folder %s: %s", folder, err) _LOGGER.warning("Can't restore folder %s: %s", folder, err)
def store_homeassistant(self): async def store_homeassistant(self):
"""Read all data from Home Assistant object."""
self.homeassistant[ATTR_VERSION] = self.sys_homeassistant.version
self.homeassistant[ATTR_WATCHDOG] = self.sys_homeassistant.watchdog
self.homeassistant[ATTR_BOOT] = self.sys_homeassistant.boot
self.homeassistant[ATTR_WAIT_BOOT] = self.sys_homeassistant.wait_boot
self.homeassistant[ATTR_IMAGE] = self.sys_homeassistant.image
# API/Proxy
self.homeassistant[ATTR_PORT] = self.sys_homeassistant.api_port
self.homeassistant[ATTR_SSL] = self.sys_homeassistant.api_ssl
self.homeassistant[ATTR_REFRESH_TOKEN] = self._encrypt_data(
self.sys_homeassistant.refresh_token
)
# Audio
self.homeassistant[ATTR_AUDIO_INPUT] = self.sys_homeassistant.audio_input
self.homeassistant[ATTR_AUDIO_OUTPUT] = self.sys_homeassistant.audio_output
def restore_homeassistant(self):
"""Write all data to the Home Assistant object."""
self.sys_homeassistant.watchdog = self.homeassistant[ATTR_WATCHDOG]
self.sys_homeassistant.boot = self.homeassistant[ATTR_BOOT]
self.sys_homeassistant.wait_boot = self.homeassistant[ATTR_WAIT_BOOT]
# API/Proxy
self.sys_homeassistant.api_port = self.homeassistant[ATTR_PORT]
self.sys_homeassistant.api_ssl = self.homeassistant[ATTR_SSL]
self.sys_homeassistant.refresh_token = self._decrypt_data(
self.homeassistant[ATTR_REFRESH_TOKEN]
)
# Audio
self.sys_homeassistant.audio_input = self.homeassistant[ATTR_AUDIO_INPUT]
self.sys_homeassistant.audio_output = self.homeassistant[ATTR_AUDIO_OUTPUT]
# save
self.sys_homeassistant.save_data()
async def store_homeassistant_config_dir(self):
"""Backup Home Assitant Core configuration folder.""" """Backup Home Assitant Core configuration folder."""
self._data[ATTR_HOMEASSISTANT] = {ATTR_VERSION: self.sys_homeassistant.version}
# Backup Home Assistant Core config directory # Backup Home Assistant Core config directory
homeassistant_file = SecureTarFile( homeassistant_file = SecureTarFile(
@ -503,10 +462,13 @@ class Backup(CoreSysAttributes):
) )
await self.sys_homeassistant.backup(homeassistant_file) await self.sys_homeassistant.backup(homeassistant_file)
self._data[ATTR_FOLDERS].append(FOLDER_HOMEASSISTANT)
async def restore_homeassistant_config_dir(self): # Store size
self.homeassistant[ATTR_SIZE] = homeassistant_file.size
async def restore_homeassistant(self) -> Awaitable[None]:
"""Restore Home Assitant Core configuration folder.""" """Restore Home Assitant Core configuration folder."""
await self.sys_homeassistant.core.stop()
# Restore Home Assistant Core config directory # Restore Home Assistant Core config directory
homeassistant_file = SecureTarFile( homeassistant_file = SecureTarFile(
@ -515,16 +477,37 @@ class Backup(CoreSysAttributes):
await self.sys_homeassistant.restore(homeassistant_file) await self.sys_homeassistant.restore(homeassistant_file)
# Generate restore task
async def _core_update():
try:
if self.homeassistant_version == self.sys_homeassistant.version:
return
except TypeError:
# Home Assistant is not yet installed / None
pass
except AwesomeVersionCompareException as err:
raise BackupError(
f"Invalid Home Assistant Core version {self.homeassistant_version}",
_LOGGER.error,
) from err
await self.sys_homeassistant.core.update(self.homeassistant_version)
return self.sys_create_task(_core_update())
def store_repositories(self): def store_repositories(self):
"""Store repository list into backup.""" """Store repository list into backup."""
self.repositories = self.sys_config.addons_repositories self.repositories = self.sys_config.addons_repositories
def restore_repositories(self): async def restore_repositories(self, replace: bool = False):
"""Restore repositories from backup. """Restore repositories from backup.
Return a coroutine. Return a coroutine.
""" """
return self.sys_store.update_repositories(self.repositories) new_list: set[str] = set(self.repositories)
if not replace:
new_list.update(self.sys_config.addons_repositories)
await self.sys_store.update_repositories(list(new_list))
def store_dockerconfig(self): def store_dockerconfig(self):
"""Store the configuration for Docker.""" """Store the configuration for Docker."""
@ -538,8 +521,11 @@ class Backup(CoreSysAttributes):
} }
} }
def restore_dockerconfig(self): def restore_dockerconfig(self, replace: bool = False):
"""Restore the configuration for Docker.""" """Restore the configuration for Docker."""
if replace:
self.sys_docker.config.registries.clear()
if ATTR_REGISTRIES in self.docker: if ATTR_REGISTRIES in self.docker:
self.sys_docker.config.registries.update( self.sys_docker.config.registries.update(
{ {

View File

@ -1,15 +1,11 @@
"""Backup manager.""" """Backup manager."""
from __future__ import annotations
import asyncio import asyncio
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Awaitable
from awesomeversion.awesomeversion import AwesomeVersion
from awesomeversion.exceptions import AwesomeVersionCompareException
from supervisor.addons.addon import Addon
from supervisor.backups.validate import ALL_FOLDERS
from ..addons.addon import Addon
from ..const import FOLDER_HOMEASSISTANT, CoreState from ..const import FOLDER_HOMEASSISTANT, CoreState
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
from ..exceptions import AddonsError from ..exceptions import AddonsError
@ -18,6 +14,7 @@ from ..utils.dt import utcnow
from .backup import Backup from .backup import Backup
from .const import BackupType from .const import BackupType
from .utils import create_slug from .utils import create_slug
from .validate import ALL_FOLDERS
_LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER: logging.Logger = logging.getLogger(__name__)
@ -41,8 +38,12 @@ class BackupManager(CoreSysAttributes):
return self._backups.get(slug) return self._backups.get(slug)
def _create_backup( def _create_backup(
self, name, sys_type, password, compressed=True, homeassistant=True self,
): name: str,
sys_type: BackupType,
password: str | None,
compressed: bool = True,
) -> Backup:
"""Initialize a new backup object from name.""" """Initialize a new backup object from name."""
date_str = utcnow().isoformat() date_str = utcnow().isoformat()
slug = create_slug(name, date_str) slug = create_slug(name, date_str)
@ -52,10 +53,6 @@ class BackupManager(CoreSysAttributes):
backup = Backup(self.coresys, tar_file) backup = Backup(self.coresys, tar_file)
backup.new(slug, name, date_str, sys_type, password, compressed) backup.new(slug, name, date_str, sys_type, password, compressed)
# set general data
if homeassistant:
backup.store_homeassistant()
backup.store_repositories() backup.store_repositories()
backup.store_dockerconfig() backup.store_dockerconfig()
@ -136,6 +133,7 @@ class BackupManager(CoreSysAttributes):
backup: Backup, backup: Backup,
addon_list: list[Addon], addon_list: list[Addon],
folder_list: list[str], folder_list: list[str],
homeassistant: bool,
): ):
try: try:
self.sys_core.state = CoreState.FREEZE self.sys_core.state = CoreState.FREEZE
@ -147,8 +145,9 @@ class BackupManager(CoreSysAttributes):
await backup.store_addons(addon_list) await backup.store_addons(addon_list)
# Backup folders # Backup folders
if FOLDER_HOMEASSISTANT in folder_list: # HomeAssistant Folder is for v1
await backup.store_homeassistant_config_dir() if homeassistant or FOLDER_HOMEASSISTANT in folder_list:
await backup.store_homeassistant()
folder_list = list(folder_list) folder_list = list(folder_list)
folder_list.remove(FOLDER_HOMEASSISTANT) folder_list.remove(FOLDER_HOMEASSISTANT)
@ -178,7 +177,7 @@ class BackupManager(CoreSysAttributes):
_LOGGER.info("Creating new full backup with slug %s", backup.slug) _LOGGER.info("Creating new full backup with slug %s", backup.slug)
async with self.lock: async with self.lock:
backup = await self._do_backup( backup = await self._do_backup(
backup, self.sys_addons.installed, ALL_FOLDERS backup, self.sys_addons.installed, ALL_FOLDERS, True
) )
if backup: if backup:
_LOGGER.info("Creating full backup with slug %s completed", backup.slug) _LOGGER.info("Creating full backup with slug %s completed", backup.slug)
@ -205,9 +204,7 @@ class BackupManager(CoreSysAttributes):
if len(addons) == 0 and len(folders) == 0 and not homeassistant: if len(addons) == 0 and len(folders) == 0 and not homeassistant:
_LOGGER.error("Nothing to create backup for") _LOGGER.error("Nothing to create backup for")
backup = self._create_backup( backup = self._create_backup(name, BackupType.PARTIAL, password, compressed)
name, BackupType.PARTIAL, password, compressed, homeassistant
)
_LOGGER.info("Creating new partial backup with slug %s", backup.slug) _LOGGER.info("Creating new partial backup with slug %s", backup.slug)
async with self.lock: async with self.lock:
@ -219,7 +216,7 @@ class BackupManager(CoreSysAttributes):
continue continue
_LOGGER.warning("Add-on %s not found/installed", addon_slug) _LOGGER.warning("Add-on %s not found/installed", addon_slug)
backup = await self._do_backup(backup, addon_list, folders) backup = await self._do_backup(backup, addon_list, folders, homeassistant)
if backup: if backup:
_LOGGER.info( _LOGGER.info(
"Creating partial backup with slug %s completed", backup.slug "Creating partial backup with slug %s completed", backup.slug
@ -229,25 +226,22 @@ class BackupManager(CoreSysAttributes):
async def _do_restore( async def _do_restore(
self, self,
backup: Backup, backup: Backup,
addon_list: list[Addon], addon_list: list[str],
folder_list: list[str], folder_list: list[Path],
homeassistant: bool, homeassistant: bool,
remove_other_addons: bool, replace: bool,
): ):
try: # Version 1
# Stop Home Assistant Core if we restore the version or config directory if FOLDER_HOMEASSISTANT in folder_list:
if FOLDER_HOMEASSISTANT in folder_list or homeassistant: folder_list.remove(FOLDER_HOMEASSISTANT)
await self.sys_homeassistant.core.stop() homeassistant = True
try:
task_hass = None
async with backup: async with backup:
# Restore docker config # Restore docker config
_LOGGER.info("Restoring %s Docker config", backup.slug) _LOGGER.info("Restoring %s Docker config", backup.slug)
backup.restore_dockerconfig() backup.restore_dockerconfig(replace)
if FOLDER_HOMEASSISTANT in folder_list:
await backup.restore_homeassistant_config_dir()
folder_list = list(folder_list)
folder_list.remove(FOLDER_HOMEASSISTANT)
# Process folders # Process folders
if folder_list: if folder_list:
@ -255,21 +249,12 @@ class BackupManager(CoreSysAttributes):
await backup.restore_folders(folder_list) await backup.restore_folders(folder_list)
# Process Home-Assistant # Process Home-Assistant
task_hass = None
if homeassistant: if homeassistant:
_LOGGER.info("Restoring %s Home Assistant Core", backup.slug) _LOGGER.info("Restoring %s Home Assistant Core", backup.slug)
backup.restore_homeassistant() task_hass = await backup.restore_homeassistant()
task_hass = self._update_core_task(backup.homeassistant_version)
if addon_list:
_LOGGER.info("Restoring %s Repositories", backup.slug)
await backup.restore_repositories()
_LOGGER.info("Restoring %s Add-ons", backup.slug)
await backup.restore_addons(addon_list)
# Delete delta add-ons # Delete delta add-ons
if remove_other_addons: if replace:
_LOGGER.info("Removing Add-ons not in the backup %s", backup.slug) _LOGGER.info("Removing Add-ons not in the backup %s", backup.slug)
for addon in self.sys_addons.installed: for addon in self.sys_addons.installed:
if addon.slug in backup.addon_list: if addon.slug in backup.addon_list:
@ -282,6 +267,13 @@ class BackupManager(CoreSysAttributes):
except AddonsError: except AddonsError:
_LOGGER.warning("Can't uninstall Add-on %s", addon.slug) _LOGGER.warning("Can't uninstall Add-on %s", addon.slug)
if addon_list:
_LOGGER.info("Restoring %s Repositories", backup.slug)
await backup.restore_repositories(replace)
_LOGGER.info("Restoring %s Add-ons", backup.slug)
await backup.restore_addons(addon_list)
# Wait for Home Assistant Core update/downgrade # Wait for Home Assistant Core update/downgrade
if task_hass: if task_hass:
_LOGGER.info("Restore %s wait for Home-Assistant", backup.slug) _LOGGER.info("Restore %s wait for Home-Assistant", backup.slug)
@ -352,7 +344,12 @@ class BackupManager(CoreSysAttributes):
] ]
) )
async def do_restore_partial( async def do_restore_partial(
self, backup, homeassistant=False, addons=None, folders=None, password=None self,
backup: Backup,
homeassistant: bool = False,
addons: list[str] | None = None,
folders: list[Path] | None = None,
password: str | None = None,
): ):
"""Restore a backup.""" """Restore a backup."""
if self.lock.locked(): if self.lock.locked():
@ -363,6 +360,10 @@ class BackupManager(CoreSysAttributes):
_LOGGER.error("Invalid password for backup %s", backup.slug) _LOGGER.error("Invalid password for backup %s", backup.slug)
return False return False
if backup.homeassistant is None and homeassistant:
_LOGGER.error("No Home Assistant Core data inside the backup")
return False
addon_list = addons or [] addon_list = addons or []
folder_list = folders or [] folder_list = folders or []
@ -378,16 +379,3 @@ class BackupManager(CoreSysAttributes):
if success: if success:
_LOGGER.info("Partial-Restore %s done", backup.slug) _LOGGER.info("Partial-Restore %s done", backup.slug)
def _update_core_task(self, version: AwesomeVersion) -> Awaitable[None]:
"""Process core update if needed and make awaitable object."""
async def _core_update():
try:
if version == self.sys_homeassistant.version:
return
except (AwesomeVersionCompareException, TypeError):
pass
await self.sys_homeassistant.core.update(version)
return self.sys_create_task(_core_update())

View File

@ -1,49 +1,26 @@
"""Util add-on functions.""" """Util add-on functions."""
import hashlib import hashlib
import re import re
import shutil
RE_DIGITS = re.compile(r"\d+") RE_DIGITS = re.compile(r"\d+")
def password_to_key(password): def password_to_key(password: str) -> bytes:
"""Generate a AES Key from password.""" """Generate a AES Key from password."""
password = password.encode() key: bytes = password.encode()
for _ in range(100): for _ in range(100):
password = hashlib.sha256(password).digest() key = hashlib.sha256(key).digest()
return password[:16] return key[:16]
def password_for_validating(password): def key_to_iv(key: bytes) -> bytes:
"""Generate a SHA256 hash from password."""
for _ in range(100):
password = hashlib.sha256(password.encode()).hexdigest()
try:
return str(sum(map(int, RE_DIGITS.findall(password))))[0]
except (ValueError, IndexError):
return "0"
def key_to_iv(key):
"""Generate an iv from Key.""" """Generate an iv from Key."""
for _ in range(100): for _ in range(100):
key = hashlib.sha256(key).digest() key = hashlib.sha256(key).digest()
return key[:16] return key[:16]
def create_slug(name, date_str): def create_slug(name: str, date_str: str) -> str:
"""Generate a hash from repository.""" """Generate a hash from repository."""
key = f"{date_str} - {name}".lower().encode() key = f"{date_str} - {name}".lower().encode()
return hashlib.sha1(key).hexdigest()[:8] return hashlib.sha1(key).hexdigest()[:8]
def remove_folder(folder):
"""Remove folder data but not the folder itself."""
for obj in folder.iterdir():
try:
if obj.is_dir():
shutil.rmtree(obj, ignore_errors=True)
else:
obj.unlink()
except (OSError, shutil.Error):
pass

View File

@ -4,28 +4,19 @@ import voluptuous as vol
from ..backups.const import BackupType from ..backups.const import BackupType
from ..const import ( from ..const import (
ATTR_ADDONS, ATTR_ADDONS,
ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT,
ATTR_BOOT,
ATTR_COMPRESSED, ATTR_COMPRESSED,
ATTR_CRYPTO, ATTR_CRYPTO,
ATTR_DATE, ATTR_DATE,
ATTR_DOCKER, ATTR_DOCKER,
ATTR_FOLDERS, ATTR_FOLDERS,
ATTR_HOMEASSISTANT, ATTR_HOMEASSISTANT,
ATTR_IMAGE,
ATTR_NAME, ATTR_NAME,
ATTR_PORT,
ATTR_PROTECTED, ATTR_PROTECTED,
ATTR_REFRESH_TOKEN,
ATTR_REPOSITORIES, ATTR_REPOSITORIES,
ATTR_SIZE, ATTR_SIZE,
ATTR_SLUG, ATTR_SLUG,
ATTR_SSL,
ATTR_TYPE, ATTR_TYPE,
ATTR_VERSION, ATTR_VERSION,
ATTR_WAIT_BOOT,
ATTR_WATCHDOG,
CRYPTO_AES128, CRYPTO_AES128,
FOLDER_ADDONS, FOLDER_ADDONS,
FOLDER_HOMEASSISTANT, FOLDER_HOMEASSISTANT,
@ -33,13 +24,7 @@ from ..const import (
FOLDER_SHARE, FOLDER_SHARE,
FOLDER_SSL, FOLDER_SSL,
) )
from ..validate import ( from ..validate import SCHEMA_DOCKER_CONFIG, repositories, version_tag
SCHEMA_DOCKER_CONFIG,
docker_image,
network_port,
repositories,
version_tag,
)
ALL_FOLDERS = [ ALL_FOLDERS = [
FOLDER_HOMEASSISTANT, FOLDER_HOMEASSISTANT,
@ -62,31 +47,22 @@ def unique_addons(addons_list):
# pylint: disable=no-value-for-parameter # pylint: disable=no-value-for-parameter
SCHEMA_BACKUP = vol.Schema( SCHEMA_BACKUP = vol.Schema(
{ {
vol.Optional(ATTR_VERSION, default=1): vol.All(vol.Coerce(int), vol.In((1, 2))),
vol.Required(ATTR_SLUG): str, vol.Required(ATTR_SLUG): str,
vol.Required(ATTR_TYPE): vol.Coerce(BackupType), vol.Required(ATTR_TYPE): vol.Coerce(BackupType),
vol.Required(ATTR_NAME): str, vol.Required(ATTR_NAME): str,
vol.Required(ATTR_DATE): str, vol.Required(ATTR_DATE): str,
vol.Optional(ATTR_COMPRESSED, default=True): vol.Boolean(), vol.Optional(ATTR_COMPRESSED, default=True): vol.Boolean(),
vol.Inclusive(ATTR_PROTECTED, "encrypted"): vol.All( vol.Optional(ATTR_PROTECTED, default=False): vol.Boolean(),
str, vol.Length(min=1, max=1) vol.Optional(ATTR_CRYPTO, default=None): vol.Maybe(CRYPTO_AES128),
), vol.Optional(ATTR_HOMEASSISTANT, default=None): vol.Maybe(
vol.Inclusive(ATTR_CRYPTO, "encrypted"): CRYPTO_AES128, vol.Schema(
vol.Optional(ATTR_HOMEASSISTANT, default=dict): vol.Schema( {
{ vol.Required(ATTR_VERSION): version_tag,
vol.Optional(ATTR_VERSION): version_tag, vol.Optional(ATTR_SIZE, default=0): vol.Coerce(float),
vol.Optional(ATTR_IMAGE): docker_image, },
vol.Optional(ATTR_BOOT, default=True): vol.Boolean(), extra=vol.REMOVE_EXTRA,
vol.Optional(ATTR_SSL, default=False): vol.Boolean(), )
vol.Optional(ATTR_PORT, default=8123): network_port,
vol.Optional(ATTR_REFRESH_TOKEN): vol.Maybe(str),
vol.Optional(ATTR_WATCHDOG, default=True): vol.Boolean(),
vol.Optional(ATTR_WAIT_BOOT, default=600): vol.All(
vol.Coerce(int), vol.Range(min=60)
),
vol.Optional(ATTR_AUDIO_OUTPUT, default=None): vol.Maybe(str),
vol.Optional(ATTR_AUDIO_INPUT, default=None): vol.Maybe(str),
},
extra=vol.REMOVE_EXTRA,
), ),
vol.Optional(ATTR_DOCKER, default=dict): SCHEMA_DOCKER_CONFIG, vol.Optional(ATTR_DOCKER, default=dict): SCHEMA_DOCKER_CONFIG,
vol.Optional(ATTR_FOLDERS, default=list): vol.All( vol.Optional(ATTR_FOLDERS, default=list): vol.All(

View File

@ -460,3 +460,10 @@ class StoreNotFound(StoreError):
class StoreJobError(StoreError, JobException): class StoreJobError(StoreError, JobException):
"""Raise on job error with git.""" """Raise on job error with git."""
# Backup
class BackupError(HassioError):
"""Raise if an error during backup is happening."""

View File

@ -5,14 +5,14 @@ import logging
from pathlib import Path from pathlib import Path
import shutil import shutil
import tarfile import tarfile
from tempfile import TemporaryDirectory
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
from awesomeversion import AwesomeVersion, AwesomeVersionException from awesomeversion import AwesomeVersion, AwesomeVersionException
from securetar import atomic_contents_add, secure_path
from supervisor.exceptions import HomeAssistantWSError import voluptuous as vol
from supervisor.homeassistant.const import WSType from voluptuous.humanize import humanize_error
from supervisor.utils.tar import SecureTarFile, atomic_contents_add
from ..const import ( from ..const import (
ATTR_ACCESS_TOKEN, ATTR_ACCESS_TOKEN,
@ -29,14 +29,22 @@ from ..const import (
ATTR_WAIT_BOOT, ATTR_WAIT_BOOT,
ATTR_WATCHDOG, ATTR_WATCHDOG,
FILE_HASSIO_HOMEASSISTANT, FILE_HASSIO_HOMEASSISTANT,
FOLDER_HOMEASSISTANT,
BusEvent, BusEvent,
) )
from ..coresys import CoreSys, CoreSysAttributes from ..coresys import CoreSys, CoreSysAttributes
from ..exceptions import (
ConfigurationFileError,
HomeAssistantError,
HomeAssistantWSError,
)
from ..hardware.const import PolicyGroup from ..hardware.const import PolicyGroup
from ..hardware.data import Device from ..hardware.data import Device
from ..jobs.decorator import Job
from ..utils import remove_folder
from ..utils.common import FileConfiguration from ..utils.common import FileConfiguration
from ..utils.json import read_json_file, write_json_file
from .api import HomeAssistantAPI from .api import HomeAssistantAPI
from .const import WSType
from .core import HomeAssistantCore from .core import HomeAssistantCore
from .secrets import HomeAssistantSecrets from .secrets import HomeAssistantSecrets
from .validate import SCHEMA_HASS_CONFIG from .validate import SCHEMA_HASS_CONFIG
@ -301,7 +309,8 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
self.sys_homeassistant.websocket.send_message({ATTR_TYPE: "usb/scan"}) self.sys_homeassistant.websocket.send_message({ATTR_TYPE: "usb/scan"})
async def backup(self, secure_tar_file: SecureTarFile) -> None: @Job()
async def backup(self, tar_file: tarfile.TarFile) -> None:
"""Backup Home Assistant Core config/ directory.""" """Backup Home Assistant Core config/ directory."""
# Let Home Assistant Core know we are about to backup # Let Home Assistant Core know we are about to backup
@ -313,46 +322,114 @@ class HomeAssistant(FileConfiguration, CoreSysAttributes):
"Preparing backup of Home Assistant Core failed. Check HA Core logs." "Preparing backup of Home Assistant Core failed. Check HA Core logs."
) )
def _write_tarfile(): with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
with secure_tar_file as tar_file: temp_path = Path(temp)
# Backup system
origin_dir = Path(self.sys_config.path_supervisor, FOLDER_HOMEASSISTANT)
# Backup data # Store local configs/state
atomic_contents_add(
tar_file,
origin_dir,
excludes=HOMEASSISTANT_BACKUP_EXCLUDE,
arcname=".",
)
try:
_LOGGER.info("Backing up Home Assistant Core config folder")
await self.sys_run_in_executor(_write_tarfile)
_LOGGER.info("Backup Home Assistant Core config folder done")
finally:
try: try:
await self.sys_homeassistant.websocket.async_send_command( write_json_file(temp_path.joinpath("homeassistant.json"), self._data)
{ATTR_TYPE: WSType.BACKUP_END} except ConfigurationFileError as err:
) raise HomeAssistantError(
except HomeAssistantWSError: f"Can't save meta for Home Assistant Core: {err!s}", _LOGGER.error
_LOGGER.warning( ) from err
"Error during Home Assistant Core backup. Check HA Core logs."
)
async def restore(self, secure_tar_file: SecureTarFile) -> None: # Backup data config folder
def _write_tarfile():
with tar_file as backup:
# Backup metadata
backup.add(temp, arcname=".")
# Backup data
atomic_contents_add(
backup,
self.sys_config.path_homeassistant,
excludes=HOMEASSISTANT_BACKUP_EXCLUDE,
arcname="data",
)
try:
_LOGGER.info("Backing up Home Assistant Core config folder")
await self.sys_run_in_executor(_write_tarfile)
_LOGGER.info("Backup Home Assistant Core config folder done")
finally:
try:
await self.sys_homeassistant.websocket.async_send_command(
{ATTR_TYPE: WSType.BACKUP_END}
)
except HomeAssistantWSError:
_LOGGER.warning(
"Error during Home Assistant Core backup. Check HA Core logs."
)
async def restore(self, tar_file: tarfile.TarFile) -> None:
"""Restore Home Assistant Core config/ directory.""" """Restore Home Assistant Core config/ directory."""
with TemporaryDirectory(dir=self.sys_config.path_tmp) as temp:
temp_path = Path(temp)
temp_data = temp_path.joinpath("data")
temp_meta = temp_path.joinpath("homeassistant.json")
# Perform a restore # extract backup
def _restore_tarfile(): def _extract_tarfile():
origin_dir = Path(self.sys_config.path_supervisor, FOLDER_HOMEASSISTANT) """Extract tar backup."""
with tar_file as backup:
backup.extractall(path=temp_path, members=secure_path(backup))
with secure_tar_file as tar_file: try:
tar_file.extractall(path=origin_dir, members=tar_file) await self.sys_run_in_executor(_extract_tarfile)
except tarfile.TarError as err:
raise HomeAssistantError(
f"Can't read tarfile {tar_file}: {err}", _LOGGER.error
) from err
# Check old backup format v1
if not temp_data.exists():
temp_data = temp_path
# Restore data
def _restore_data():
"""Restore data."""
shutil.copytree(
temp_data, self.sys_config.path_homeassistant, symlinks=True
)
try:
_LOGGER.info("Restore Home Assistant Core config folder") _LOGGER.info("Restore Home Assistant Core config folder")
await self.sys_run_in_executor(_restore_tarfile) await remove_folder(self.sys_config.path_homeassistant)
try:
await self.sys_run_in_executor(_restore_data)
except shutil.Error as err:
raise HomeAssistantError(
f"Can't restore origin data: {err}", _LOGGER.error
) from err
_LOGGER.info("Restore Home Assistant Core config folder done") _LOGGER.info("Restore Home Assistant Core config folder done")
except (tarfile.TarError, OSError) as err:
_LOGGER.warning("Can't restore Home Assistant Core config folder: %s", err) if not temp_meta.exists():
return
_LOGGER.info("Restore Home Assistant Core metadata")
# Read backup data
try:
data = read_json_file(temp_meta)
except ConfigurationFileError as err:
raise HomeAssistantError() from err
# Validate
try:
data = SCHEMA_HASS_CONFIG(data)
except vol.Invalid as err:
raise HomeAssistantError(
f"Can't validate backup data: {humanize_error(data, err)}",
_LOGGER.err,
) from err
# Restore metadata
for attr in (
ATTR_AUDIO_INPUT,
ATTR_AUDIO_OUTPUT,
ATTR_PORT,
ATTR_SSL,
ATTR_REFRESH_TOKEN,
ATTR_WATCHDOG,
ATTR_WAIT_BOOT,
):
self._data[attr] = data[attr]

View File

@ -1,182 +0,0 @@
"""Tarfile fileobject handler for encrypted files."""
import hashlib
import logging
import os
from pathlib import Path, PurePath
import tarfile
from typing import IO, Generator, Optional
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import (
Cipher,
CipherContext,
algorithms,
modes,
)
_LOGGER: logging.Logger = logging.getLogger(__name__)
BLOCK_SIZE = 16
BLOCK_SIZE_BITS = 128
MOD_READ = "r"
MOD_WRITE = "w"
class SecureTarFile:
"""Handle encrypted files for tarfile library."""
def __init__(
self, name: Path, mode: str, key: Optional[bytes] = None, gzip: bool = True
) -> None:
"""Initialize encryption handler."""
self._file: Optional[IO[bytes]] = None
self._mode: str = mode
self._name: Path = name
# Tarfile options
self._tar: Optional[tarfile.TarFile] = None
self._tar_mode: str = f"{mode}|gz" if gzip else f"{mode}|"
# Encryption/Description
self._aes: Optional[Cipher] = None
self._key: Optional[bytes] = key
# Function helper
self._decrypt: Optional[CipherContext] = None
self._encrypt: Optional[CipherContext] = None
def __enter__(self) -> tarfile.TarFile:
"""Start context manager tarfile."""
if not self._key:
self._tar = tarfile.open(
name=str(self._name), mode=self._tar_mode, dereference=False
)
return self._tar
# Encrypted/Decryped Tarfile
if self._mode.startswith("r"):
file_mode: int = os.O_RDONLY
else:
file_mode: int = os.O_WRONLY | os.O_CREAT
self._file = os.open(self._name, file_mode, 0o666)
# Extract IV for CBC
if self._mode == MOD_READ:
cbc_rand = os.read(self._file, 16)
else:
cbc_rand = os.urandom(16)
os.write(self._file, cbc_rand)
# Create Cipher
self._aes = Cipher(
algorithms.AES(self._key),
modes.CBC(_generate_iv(self._key, cbc_rand)),
backend=default_backend(),
)
self._decrypt = self._aes.decryptor()
self._encrypt = self._aes.encryptor()
self._tar = tarfile.open(fileobj=self, mode=self._tar_mode, dereference=False)
return self._tar
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Close file."""
if self._tar:
self._tar.close()
self._tar = None
if self._file:
os.close(self._file)
self._file = None
def write(self, data: bytes) -> None:
"""Write data."""
if len(data) % BLOCK_SIZE != 0:
padder = padding.PKCS7(BLOCK_SIZE_BITS).padder()
data = padder.update(data) + padder.finalize()
os.write(self._file, self._encrypt.update(data))
def read(self, size: int = 0) -> bytes:
"""Read data."""
return self._decrypt.update(os.read(self._file, size))
@property
def path(self) -> Path:
"""Return path object of tarfile."""
return self._name
@property
def size(self) -> float:
"""Return backup size."""
if not self._name.is_file():
return 0
return round(self._name.stat().st_size / 1_048_576, 2) # calc mbyte
def _generate_iv(key: bytes, salt: bytes) -> bytes:
"""Generate an iv from data."""
temp_iv = key + salt
for _ in range(100):
temp_iv = hashlib.sha256(temp_iv).digest()
return temp_iv[:16]
def secure_path(tar: tarfile.TarFile) -> Generator[tarfile.TarInfo, None, None]:
"""Security safe check of path.
Prevent ../ or absolut paths
"""
for member in tar:
file_path = Path(member.name)
try:
if file_path.is_absolute():
raise ValueError()
Path("/fake", file_path).resolve().relative_to("/fake")
except (ValueError, RuntimeError):
_LOGGER.warning("Found issue with file %s", file_path)
continue
else:
yield member
def _is_excluded_by_filter(path: PurePath, exclude_list: list[str]) -> bool:
"""Filter to filter excludes."""
for exclude in exclude_list:
if not path.match(exclude):
continue
_LOGGER.debug("Ignoring %s because of %s", path, exclude)
return True
return False
def atomic_contents_add(
tar_file: tarfile.TarFile,
origin_path: Path,
excludes: list[str],
arcname: str = ".",
) -> None:
"""Append directories and/or files to the TarFile if excludes wont filter."""
if _is_excluded_by_filter(origin_path, excludes):
return None
# Add directory only (recursive=False) to ensure we also archive empty directories
tar_file.add(origin_path.as_posix(), arcname=arcname, recursive=False)
for directory_item in origin_path.iterdir():
if _is_excluded_by_filter(directory_item, excludes):
continue
arcpath = PurePath(arcname, directory_item.name).as_posix()
if directory_item.is_dir() and not directory_item.is_symlink():
atomic_contents_add(tar_file, directory_item, excludes, arcpath)
continue
tar_file.add(directory_item.as_posix(), arcname=arcpath, recursive=False)
return None

View File

@ -19,10 +19,10 @@ def fixture_backup_mock():
backup_instance.store_addons = AsyncMock(return_value=None) backup_instance.store_addons = AsyncMock(return_value=None)
backup_instance.store_folders = AsyncMock(return_value=None) backup_instance.store_folders = AsyncMock(return_value=None)
backup_instance.store_homeassistant_config_dir = AsyncMock(return_value=None) backup_instance.store_homeassistant = AsyncMock(return_value=None)
backup_instance.store_addons = AsyncMock(return_value=None) backup_instance.store_addons = AsyncMock(return_value=None)
backup_instance.restore_folders = AsyncMock(return_value=None) backup_instance.restore_folders = AsyncMock(return_value=None)
backup_instance.restore_homeassistant_config_dir = AsyncMock(return_value=None) backup_instance.restore_homeassistant = AsyncMock(return_value=None)
backup_instance.restore_addons = AsyncMock(return_value=None) backup_instance.restore_addons = AsyncMock(return_value=None)
backup_instance.restore_repositories = AsyncMock(return_value=None) backup_instance.restore_repositories = AsyncMock(return_value=None)

View File

@ -64,7 +64,7 @@ async def test_do_backup_full_uncompressed(
backup_instance.store_folders.assert_called_once() backup_instance.store_folders.assert_called_once()
assert len(backup_instance.store_folders.call_args[0][0]) == 4 assert len(backup_instance.store_folders.call_args[0][0]) == 4
backup_instance.store_homeassistant_config_dir.assert_called_once() backup_instance.store_homeassistant.assert_called_once()
assert coresys.core.state == CoreState.RUNNING assert coresys.core.state == CoreState.RUNNING
@ -157,7 +157,7 @@ async def test_do_backup_partial_maximal(
backup_instance.store_folders.assert_called_once() backup_instance.store_folders.assert_called_once()
assert len(backup_instance.store_folders.call_args[0][0]) == 1 assert len(backup_instance.store_folders.call_args[0][0]) == 1
backup_instance.store_homeassistant_config_dir.assert_called_once() backup_instance.store_homeassistant.assert_called_once()
assert coresys.core.state == CoreState.RUNNING assert coresys.core.state == CoreState.RUNNING
@ -268,6 +268,6 @@ async def test_do_restore_partial_maximal(coresys: CoreSys, partial_backup_mock)
backup_instance.restore_addons.assert_called_once() backup_instance.restore_addons.assert_called_once()
backup_instance.restore_folders.assert_called_once() backup_instance.restore_folders.assert_called_once()
backup_instance.restore_homeassistant_config_dir.assert_called_once() backup_instance.restore_homeassistant.assert_called_once()
assert coresys.core.state == CoreState.RUNNING assert coresys.core.state == CoreState.RUNNING

View File

@ -2,6 +2,8 @@
# pylint: disable=import-error,protected-access # pylint: disable=import-error,protected-access
from pathlib import Path from pathlib import Path
from securetar import SecureTarFile
from supervisor.backups.backup import Backup from supervisor.backups.backup import Backup
from supervisor.backups.const import BackupType from supervisor.backups.const import BackupType
from supervisor.const import ATTR_DATE, ATTR_SLUG, ATTR_TYPE from supervisor.const import ATTR_DATE, ATTR_SLUG, ATTR_TYPE
@ -10,7 +12,6 @@ from supervisor.resolution.const import ContextType, SuggestionType
from supervisor.resolution.data import Suggestion from supervisor.resolution.data import Suggestion
from supervisor.resolution.fixups.clear_full_backup import FixupClearFullBackup from supervisor.resolution.fixups.clear_full_backup import FixupClearFullBackup
from supervisor.utils.dt import utcnow from supervisor.utils.dt import utcnow
from supervisor.utils.tar import SecureTarFile
async def test_fixup(coresys: CoreSys, tmp_path): async def test_fixup(coresys: CoreSys, tmp_path):

View File

@ -1,145 +0,0 @@
"""Test Tarfile functions."""
import os
from pathlib import Path, PurePath
import shutil
import attr
from supervisor.utils.tar import (
SecureTarFile,
_is_excluded_by_filter,
atomic_contents_add,
secure_path,
)
@attr.s
class TarInfo:
"""Fake TarInfo."""
name: str = attr.ib()
def test_secure_path():
"""Test Secure Path."""
test_list = [
TarInfo("test.txt"),
TarInfo("data/xy.blob"),
TarInfo("bla/blu/ble"),
TarInfo("data/../xy.blob"),
]
assert test_list == list(secure_path(test_list))
def test_not_secure_path():
"""Test Not secure path."""
test_list = [
TarInfo("/test.txt"),
TarInfo("data/../../xy.blob"),
TarInfo("/bla/blu/ble"),
]
assert [] == list(secure_path(test_list))
def test_is_excluded_by_filter_good():
"""Test exclude filter."""
filter_list = ["not/match", "/dev/xy"]
test_list = [
PurePath("test.txt"),
PurePath("data/xy.blob"),
PurePath("bla/blu/ble"),
PurePath("data/../xy.blob"),
]
for path_object in test_list:
assert _is_excluded_by_filter(path_object, filter_list) is False
def test_is_exclude_by_filter_bad():
"""Test exclude filter."""
filter_list = ["*.txt", "data/*", "bla/blu/ble"]
test_list = [
PurePath("test.txt"),
PurePath("data/xy.blob"),
PurePath("bla/blu/ble"),
PurePath("data/test_files/kk.txt"),
]
for path_object in test_list:
assert _is_excluded_by_filter(path_object, filter_list) is True
def test_create_pure_tar(tmp_path):
"""Test to create a tar file without encryption."""
# Prepair test folder
temp_orig = tmp_path.joinpath("orig")
fixture_data = Path(__file__).parents[1].joinpath("fixtures/tar_data")
shutil.copytree(fixture_data, temp_orig, symlinks=True)
# Create Tarfile
temp_tar = tmp_path.joinpath("backup.tar")
with SecureTarFile(temp_tar, "w") as tar_file:
atomic_contents_add(
tar_file,
temp_orig,
excludes=[],
arcname=".",
)
assert temp_tar.exists()
# Restore
temp_new = tmp_path.joinpath("new")
with SecureTarFile(temp_tar, "r") as tar_file:
tar_file.extractall(path=temp_new, members=tar_file)
assert temp_new.is_dir()
assert temp_new.joinpath("test_symlink").is_symlink()
assert temp_new.joinpath("test1").is_dir()
assert temp_new.joinpath("test1/script.sh").is_file()
# 775 is correct for local, but in GitHub action it's 755, both is fine
assert oct(temp_new.joinpath("test1/script.sh").stat().st_mode)[-3:] in [
"755",
"775",
]
assert temp_new.joinpath("README.md").is_file()
def test_create_ecrypted_tar(tmp_path):
"""Test to create a tar file with encryption."""
key = os.urandom(16)
# Prepair test folder
temp_orig = tmp_path.joinpath("orig")
fixture_data = Path(__file__).parents[1].joinpath("fixtures/tar_data")
shutil.copytree(fixture_data, temp_orig, symlinks=True)
# Create Tarfile
temp_tar = tmp_path.joinpath("backup.tar")
with SecureTarFile(temp_tar, "w", key=key) as tar_file:
atomic_contents_add(
tar_file,
temp_orig,
excludes=[],
arcname=".",
)
assert temp_tar.exists()
# Restore
temp_new = tmp_path.joinpath("new")
with SecureTarFile(temp_tar, "r", key=key) as tar_file:
tar_file.extractall(path=temp_new, members=tar_file)
assert temp_new.is_dir()
assert temp_new.joinpath("test_symlink").is_symlink()
assert temp_new.joinpath("test1").is_dir()
assert temp_new.joinpath("test1/script.sh").is_file()
# 775 is correct for local, but in GitHub action it's 755, both is fine
assert oct(temp_new.joinpath("test1/script.sh").stat().st_mode)[-3:] in [
"755",
"775",
]
assert temp_new.joinpath("README.md").is_file()