diff --git a/homeassistant/components/onedrive/backup.py b/homeassistant/components/onedrive/backup.py index 9926bd9cbc7..343c332f384 100644 --- a/homeassistant/components/onedrive/backup.py +++ b/homeassistant/components/onedrive/backup.py @@ -3,10 +3,12 @@ from __future__ import annotations from collections.abc import AsyncIterator, Callable, Coroutine +from dataclasses import dataclass from functools import wraps from html import unescape from json import dumps, loads import logging +from time import time from typing import Any, Concatenate from aiohttp import ClientTimeout @@ -16,7 +18,7 @@ from onedrive_personal_sdk.exceptions import ( HashMismatchError, OneDriveException, ) -from onedrive_personal_sdk.models.items import File, Folder, ItemUpdate +from onedrive_personal_sdk.models.items import ItemUpdate from onedrive_personal_sdk.models.upload import FileInfo from homeassistant.components.backup import ( @@ -35,6 +37,7 @@ _LOGGER = logging.getLogger(__name__) UPLOAD_CHUNK_SIZE = 16 * 320 * 1024 # 5.2MB TIMEOUT = ClientTimeout(connect=10, total=43200) # 12 hours METADATA_VERSION = 2 +CACHE_TTL = 300 async def async_get_backup_agents( @@ -99,6 +102,15 @@ def handle_backup_errors[_R, **P]( return wrapper +@dataclass(kw_only=True) +class OneDriveBackup: + """Define a OneDrive backup.""" + + backup: AgentBackup + backup_file_id: str + metadata_file_id: str + + class OneDriveBackupAgent(BackupAgent): """OneDrive backup agent.""" @@ -115,24 +127,20 @@ class OneDriveBackupAgent(BackupAgent): self.name = entry.title assert entry.unique_id self.unique_id = entry.unique_id + self._backup_cache: dict[str, OneDriveBackup] = {} + self._cache_expiration = time() @handle_backup_errors async def async_download_backup( self, backup_id: str, **kwargs: Any ) -> AsyncIterator[bytes]: """Download a backup file.""" - metadata_item = await self._find_item_by_backup_id(backup_id) - if ( - metadata_item is None - or metadata_item.description is None - or "backup_file_id" not in metadata_item.description - ): + backups = await self._list_cached_backups() + if backup_id not in backups: raise BackupAgentError("Backup not found") - metadata_info = loads(unescape(metadata_item.description)) - stream = await self._client.download_drive_item( - metadata_info["backup_file_id"], timeout=TIMEOUT + backups[backup_id].backup_file_id, timeout=TIMEOUT ) return stream.iter_chunked(1024) @@ -181,6 +189,7 @@ class OneDriveBackupAgent(BackupAgent): path_or_id=metadata_file.id, data=ItemUpdate(description=dumps(metadata_description)), ) + self._cache_expiration = time() @handle_backup_errors async def async_delete_backup( @@ -189,28 +198,21 @@ class OneDriveBackupAgent(BackupAgent): **kwargs: Any, ) -> None: """Delete a backup file.""" - metadata_item = await self._find_item_by_backup_id(backup_id) - if ( - metadata_item is None - or metadata_item.description is None - or "backup_file_id" not in metadata_item.description - ): + backups = await self._list_cached_backups() + if backup_id not in backups: return - metadata_info = loads(unescape(metadata_item.description)) - await self._client.delete_drive_item(metadata_info["backup_file_id"]) - await self._client.delete_drive_item(metadata_item.id) + backup = backups[backup_id] + + await self._client.delete_drive_item(backup.backup_file_id) + await self._client.delete_drive_item(backup.metadata_file_id) + self._cache_expiration = time() @handle_backup_errors async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]: """List backups.""" - items = await self._client.list_drive_items(self._folder_id) return [ - await self._download_backup_metadata(item.id) - for item in items - if item.description - and "backup_id" in item.description - and f'"metadata_version": {METADATA_VERSION}' in unescape(item.description) + backup.backup for backup in (await self._list_cached_backups()).values() ] @handle_backup_errors @@ -218,27 +220,34 @@ class OneDriveBackupAgent(BackupAgent): self, backup_id: str, **kwargs: Any ) -> AgentBackup | None: """Return a backup.""" - metadata_file = await self._find_item_by_backup_id(backup_id) - if metadata_file is None or metadata_file.description is None: - return None + backups = await self._list_cached_backups() + return backups[backup_id].backup if backup_id in backups else None - return await self._download_backup_metadata(metadata_file.id) + async def _list_cached_backups(self) -> dict[str, OneDriveBackup]: + """List backups with a cache.""" + if time() <= self._cache_expiration: + return self._backup_cache - async def _find_item_by_backup_id(self, backup_id: str) -> File | Folder | None: - """Find an item by backup ID.""" - return next( - ( - item - for item in await self._client.list_drive_items(self._folder_id) - if item.description - and backup_id in item.description - and f'"metadata_version": {METADATA_VERSION}' - in unescape(item.description) - ), - None, - ) + items = await self._client.list_drive_items(self._folder_id) - async def _download_backup_metadata(self, item_id: str) -> AgentBackup: - metadata_stream = await self._client.download_drive_item(item_id) - metadata_json = loads(await metadata_stream.read()) - return AgentBackup.from_dict(metadata_json) + async def download_backup_metadata(item_id: str) -> AgentBackup: + metadata_stream = await self._client.download_drive_item(item_id) + metadata_json = loads(await metadata_stream.read()) + return AgentBackup.from_dict(metadata_json) + + backups: dict[str, OneDriveBackup] = {} + for item in items: + if item.description and f'"metadata_version": {METADATA_VERSION}' in ( + metadata_description_json := unescape(item.description) + ): + backup = await download_backup_metadata(item.id) + metadata_description = loads(metadata_description_json) + backups[backup.backup_id] = OneDriveBackup( + backup=backup, + backup_file_id=metadata_description["backup_file_id"], + metadata_file_id=item.id, + ) + + self._cache_expiration = time() + CACHE_TTL + self._backup_cache = backups + return backups