Files
supervisor/hassio/utils/tar.py
Pascal Vizeli 7f074142bf Replace pycrpytodome with cryptocraphy (#923)
* Replace pycrpytodome with cryptocraphy

* Fix typing

* fix typing

* Fix lints

* Fix build

* Add musl libc

* Fix lint

* fix lint

* Fix algo

* Add more typing fix crypto imports v2

* Fix padding
2019-02-20 10:30:22 +01:00

114 lines
3.2 KiB
Python

"""Tarfile fileobject handler for encrypted files."""
import hashlib
import os
from pathlib import Path
import tarfile
from typing import IO, Optional
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import (
CipherContext,
Cipher,
algorithms,
modes,
)
BLOCK_SIZE = 16
BLOCK_SIZE_BITS = 128
MOD_READ = "r"
MOD_WRITE = "w"
class SecureTarFile:
"""Handle encrypted files for tarfile library."""
def __init__(
self, name: Path, mode: str, key: Optional[bytes] = None, gzip: bool = True
) -> None:
"""Initialize encryption handler."""
self._file: Optional[IO[bytes]] = None
self._mode: str = mode
self._name: Path = name
# Tarfile options
self._tar: Optional[tarfile.TarFile] = None
self._tar_mode: str = f"{mode}|gz" if gzip else f"{mode}|"
# Encryption/Description
self._aes: Optional[Cipher] = None
self._key: bytes = key
# Function helper
self._decrypt: Optional[CipherContext] = None
self._encrypt: Optional[CipherContext] = None
def __enter__(self) -> tarfile.TarFile:
"""Start context manager tarfile."""
if not self._key:
self._tar = tarfile.open(name=str(self._name), mode=self._tar_mode)
return self._tar
# Encrypted/Decryped Tarfile
self._file = self._name.open(f"{self._mode}b")
# Extract IV for CBC
if self._mode == MOD_READ:
cbc_rand = self._file.read(16)
else:
cbc_rand = os.urandom(16)
self._file.write(cbc_rand)
# Create Cipher
self._aes = Cipher(
algorithms.AES(self._key),
modes.CBC(_generate_iv(self._key, cbc_rand)),
backend=default_backend(),
)
self._decrypt = self._aes.decryptor()
self._encrypt = self._aes.encryptor()
self._tar = tarfile.open(fileobj=self, mode=self._tar_mode)
return self._tar
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Close file."""
if self._tar:
self._tar.close()
if self._file:
self._file.close()
def write(self, data: bytes) -> None:
"""Write data."""
if len(data) % BLOCK_SIZE != 0:
padder = padding.PKCS7(BLOCK_SIZE_BITS).padder()
data = padder.update(data) + padder.finalize()
self._file.write(self._encrypt.update(data))
def read(self, size: int = 0) -> bytes:
"""Read data."""
return self._decrypt.update(self._file.read(size))
@property
def path(self) -> Path:
"""Return path object of tarfile."""
return self._name
@property
def size(self) -> int:
"""Return snapshot size."""
if not self._name.is_file():
return 0
return round(self._name.stat().st_size / 1_048_576, 2) # calc mbyte
def _generate_iv(key: bytes, salt: bytes) -> bytes:
"""Generate an iv from data."""
temp_iv = key + salt
for _ in range(100):
temp_iv = hashlib.sha256(temp_iv).digest()
return temp_iv[:16]