mirror of
https://github.com/home-assistant/core.git
synced 2025-11-08 18:39:30 +00:00
Add upload capability to the backup integration (#128546)
* Add upload capability to the backup integration * Limit context switch * rename * coverage for http * Test receiving a backup file * Update test_manager.py Co-authored-by: Martin Hjelmare <marhje52@gmail.com> --------- Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
@@ -9,11 +9,15 @@ import hashlib
|
||||
import io
|
||||
import json
|
||||
from pathlib import Path
|
||||
from queue import SimpleQueue
|
||||
import shutil
|
||||
import tarfile
|
||||
from tarfile import TarError
|
||||
from tempfile import TemporaryDirectory
|
||||
import time
|
||||
from typing import Any, Protocol, cast
|
||||
|
||||
import aiohttp
|
||||
from securetar import SecureTarFile, atomic_contents_add
|
||||
|
||||
from homeassistant.backup_restore import RESTORE_BACKUP_FILE
|
||||
@@ -147,6 +151,15 @@ class BaseBackupManager(abc.ABC):
|
||||
async def async_remove_backup(self, *, slug: str, **kwargs: Any) -> None:
|
||||
"""Remove a backup."""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def async_receive_backup(
|
||||
self,
|
||||
*,
|
||||
contents: aiohttp.BodyPartReader,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Receive and store a backup file from upload."""
|
||||
|
||||
|
||||
class BackupManager(BaseBackupManager):
|
||||
"""Backup manager for the Backup integration."""
|
||||
@@ -222,6 +235,63 @@ class BackupManager(BaseBackupManager):
|
||||
LOGGER.debug("Removed backup located at %s", backup.path)
|
||||
self.backups.pop(slug)
|
||||
|
||||
async def async_receive_backup(
|
||||
self,
|
||||
*,
|
||||
contents: aiohttp.BodyPartReader,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Receive and store a backup file from upload."""
|
||||
queue: SimpleQueue[tuple[bytes, asyncio.Future[None] | None] | None] = (
|
||||
SimpleQueue()
|
||||
)
|
||||
temp_dir_handler = await self.hass.async_add_executor_job(TemporaryDirectory)
|
||||
target_temp_file = Path(
|
||||
temp_dir_handler.name, contents.filename or "backup.tar"
|
||||
)
|
||||
|
||||
def _sync_queue_consumer() -> None:
|
||||
with target_temp_file.open("wb") as file_handle:
|
||||
while True:
|
||||
if (_chunk_future := queue.get()) is None:
|
||||
break
|
||||
_chunk, _future = _chunk_future
|
||||
if _future is not None:
|
||||
self.hass.loop.call_soon_threadsafe(_future.set_result, None)
|
||||
file_handle.write(_chunk)
|
||||
|
||||
fut: asyncio.Future[None] | None = None
|
||||
try:
|
||||
fut = self.hass.async_add_executor_job(_sync_queue_consumer)
|
||||
megabytes_sending = 0
|
||||
while chunk := await contents.read_chunk(BUF_SIZE):
|
||||
megabytes_sending += 1
|
||||
if megabytes_sending % 5 != 0:
|
||||
queue.put_nowait((chunk, None))
|
||||
continue
|
||||
|
||||
chunk_future = self.hass.loop.create_future()
|
||||
queue.put_nowait((chunk, chunk_future))
|
||||
await asyncio.wait(
|
||||
(fut, chunk_future),
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
if fut.done():
|
||||
# The executor job failed
|
||||
break
|
||||
|
||||
queue.put_nowait(None) # terminate queue consumer
|
||||
finally:
|
||||
if fut is not None:
|
||||
await fut
|
||||
|
||||
def _move_and_cleanup() -> None:
|
||||
shutil.move(target_temp_file, self.backup_dir / target_temp_file.name)
|
||||
temp_dir_handler.cleanup()
|
||||
|
||||
await self.hass.async_add_executor_job(_move_and_cleanup)
|
||||
await self.load_backups()
|
||||
|
||||
async def async_create_backup(self, **kwargs: Any) -> Backup:
|
||||
"""Generate a backup."""
|
||||
if self.backing_up:
|
||||
|
||||
Reference in New Issue
Block a user