mirror of
https://github.com/home-assistant/core.git
synced 2025-04-25 01:38:02 +00:00

* Correct backup filename on delete or download of cloud backup * Improve tests * Address review comments
225 lines
7.1 KiB
Python
225 lines
7.1 KiB
Python
"""Backup platform for the cloud integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncIterator, Callable, Coroutine, Mapping
|
|
import logging
|
|
import random
|
|
from typing import Any
|
|
|
|
from aiohttp import ClientError
|
|
from hass_nabucasa import Cloud, CloudError
|
|
from hass_nabucasa.api import CloudApiNonRetryableError
|
|
from hass_nabucasa.cloud_api import (
|
|
FilesHandlerListEntry,
|
|
async_files_delete_file,
|
|
async_files_list,
|
|
)
|
|
from hass_nabucasa.files import FilesError, StorageType, calculate_b64md5
|
|
|
|
from homeassistant.components.backup import AgentBackup, BackupAgent, BackupAgentError
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.aiohttp_client import ChunkAsyncStreamIterator
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
|
|
from .client import CloudClient
|
|
from .const import DATA_CLOUD, DOMAIN, EVENT_CLOUD_EVENT
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
_RETRY_LIMIT = 5
|
|
_RETRY_SECONDS_MIN = 60
|
|
_RETRY_SECONDS_MAX = 600
|
|
|
|
|
|
async def async_get_backup_agents(
|
|
hass: HomeAssistant,
|
|
**kwargs: Any,
|
|
) -> list[BackupAgent]:
|
|
"""Return the cloud backup agent."""
|
|
cloud = hass.data[DATA_CLOUD]
|
|
if not cloud.is_logged_in:
|
|
return []
|
|
|
|
return [CloudBackupAgent(hass=hass, cloud=cloud)]
|
|
|
|
|
|
@callback
|
|
def async_register_backup_agents_listener(
|
|
hass: HomeAssistant,
|
|
*,
|
|
listener: Callable[[], None],
|
|
**kwargs: Any,
|
|
) -> Callable[[], None]:
|
|
"""Register a listener to be called when agents are added or removed."""
|
|
|
|
@callback
|
|
def unsub() -> None:
|
|
"""Unsubscribe from events."""
|
|
unsub_signal()
|
|
|
|
@callback
|
|
def handle_event(data: Mapping[str, Any]) -> None:
|
|
"""Handle event."""
|
|
if data["type"] not in ("login", "logout"):
|
|
return
|
|
listener()
|
|
|
|
unsub_signal = async_dispatcher_connect(hass, EVENT_CLOUD_EVENT, handle_event)
|
|
return unsub
|
|
|
|
|
|
class CloudBackupAgent(BackupAgent):
|
|
"""Cloud backup agent."""
|
|
|
|
domain = name = unique_id = DOMAIN
|
|
|
|
def __init__(self, hass: HomeAssistant, cloud: Cloud[CloudClient]) -> None:
|
|
"""Initialize the cloud backup sync agent."""
|
|
super().__init__()
|
|
self._cloud = cloud
|
|
self._hass = hass
|
|
|
|
async def async_download_backup(
|
|
self,
|
|
backup_id: str,
|
|
**kwargs: Any,
|
|
) -> AsyncIterator[bytes]:
|
|
"""Download a backup file.
|
|
|
|
:param backup_id: The ID of the backup that was returned in async_list_backups.
|
|
:return: An async iterator that yields bytes.
|
|
"""
|
|
if not (backup := await self._async_get_backup(backup_id)):
|
|
raise BackupAgentError("Backup not found")
|
|
|
|
try:
|
|
content = await self._cloud.files.download(
|
|
storage_type=StorageType.BACKUP,
|
|
filename=backup["Key"],
|
|
)
|
|
except CloudError as err:
|
|
raise BackupAgentError(f"Failed to download backup: {err}") from err
|
|
|
|
return ChunkAsyncStreamIterator(content)
|
|
|
|
async def async_upload_backup(
|
|
self,
|
|
*,
|
|
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
|
|
backup: AgentBackup,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Upload a backup.
|
|
|
|
:param open_stream: A function returning an async iterator that yields bytes.
|
|
:param backup: Metadata about the backup that should be uploaded.
|
|
"""
|
|
if not backup.protected:
|
|
raise BackupAgentError("Cloud backups must be protected")
|
|
|
|
size = backup.size
|
|
try:
|
|
base64md5hash = await calculate_b64md5(open_stream, size)
|
|
except FilesError as err:
|
|
raise BackupAgentError(err) from err
|
|
filename = f"{self._cloud.client.prefs.instance_id}.tar"
|
|
metadata = backup.as_dict()
|
|
|
|
tries = 1
|
|
while tries <= _RETRY_LIMIT:
|
|
try:
|
|
await self._cloud.files.upload(
|
|
storage_type=StorageType.BACKUP,
|
|
open_stream=open_stream,
|
|
filename=filename,
|
|
base64md5hash=base64md5hash,
|
|
metadata=metadata,
|
|
size=size,
|
|
)
|
|
break
|
|
except CloudApiNonRetryableError as err:
|
|
if err.code == "NC-SH-FH-03":
|
|
raise BackupAgentError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="backup_size_too_large",
|
|
translation_placeholders={
|
|
"size": str(round(size / (1024**3), 2))
|
|
},
|
|
) from err
|
|
raise BackupAgentError(f"Failed to upload backup {err}") from err
|
|
except CloudError as err:
|
|
if tries == _RETRY_LIMIT:
|
|
raise BackupAgentError(f"Failed to upload backup {err}") from err
|
|
tries += 1
|
|
retry_timer = random.randint(_RETRY_SECONDS_MIN, _RETRY_SECONDS_MAX)
|
|
_LOGGER.info(
|
|
"Failed to upload backup, retrying (%s/%s) in %ss: %s",
|
|
tries,
|
|
_RETRY_LIMIT,
|
|
retry_timer,
|
|
err,
|
|
)
|
|
await asyncio.sleep(retry_timer)
|
|
|
|
async def async_delete_backup(
|
|
self,
|
|
backup_id: str,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Delete a backup file.
|
|
|
|
:param backup_id: The ID of the backup that was returned in async_list_backups.
|
|
"""
|
|
if not (backup := await self._async_get_backup(backup_id)):
|
|
return
|
|
|
|
try:
|
|
await async_files_delete_file(
|
|
self._cloud,
|
|
storage_type=StorageType.BACKUP,
|
|
filename=backup["Key"],
|
|
)
|
|
except (ClientError, CloudError) as err:
|
|
raise BackupAgentError("Failed to delete backup") from err
|
|
|
|
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
|
|
"""List backups."""
|
|
backups = await self._async_list_backups()
|
|
return [AgentBackup.from_dict(backup["Metadata"]) for backup in backups]
|
|
|
|
async def _async_list_backups(self) -> list[FilesHandlerListEntry]:
|
|
"""List backups."""
|
|
try:
|
|
backups = await async_files_list(
|
|
self._cloud, storage_type=StorageType.BACKUP
|
|
)
|
|
except (ClientError, CloudError) as err:
|
|
raise BackupAgentError("Failed to list backups") from err
|
|
|
|
_LOGGER.debug("Cloud backups: %s", backups)
|
|
return backups
|
|
|
|
async def async_get_backup(
|
|
self,
|
|
backup_id: str,
|
|
**kwargs: Any,
|
|
) -> AgentBackup | None:
|
|
"""Return a backup."""
|
|
if not (backup := await self._async_get_backup(backup_id)):
|
|
return None
|
|
return AgentBackup.from_dict(backup["Metadata"])
|
|
|
|
async def _async_get_backup(
|
|
self,
|
|
backup_id: str,
|
|
) -> FilesHandlerListEntry | None:
|
|
"""Return a backup."""
|
|
backups = await self._async_list_backups()
|
|
|
|
for backup in backups:
|
|
if backup["Metadata"]["backup_id"] == backup_id:
|
|
return backup
|
|
|
|
return None
|