Add caching to onedrive (#137950)

* Add caching to onedrive

* Move cache invalidation
This commit is contained in:
Josef Zweck 2025-02-10 19:52:28 +01:00 committed by GitHub
parent 663860e9c2
commit e3fa739446
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,10 +3,12 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Callable, Coroutine
from dataclasses import dataclass
from functools import wraps
from html import unescape
from json import dumps, loads
import logging
from time import time
from typing import Any, Concatenate
from aiohttp import ClientTimeout
@ -16,7 +18,7 @@ from onedrive_personal_sdk.exceptions import (
HashMismatchError,
OneDriveException,
)
from onedrive_personal_sdk.models.items import File, Folder, ItemUpdate
from onedrive_personal_sdk.models.items import ItemUpdate
from onedrive_personal_sdk.models.upload import FileInfo
from homeassistant.components.backup import (
@ -35,6 +37,7 @@ _LOGGER = logging.getLogger(__name__)
UPLOAD_CHUNK_SIZE = 16 * 320 * 1024 # 5.2MB
TIMEOUT = ClientTimeout(connect=10, total=43200) # 12 hours
METADATA_VERSION = 2
CACHE_TTL = 300
async def async_get_backup_agents(
@ -99,6 +102,15 @@ def handle_backup_errors[_R, **P](
return wrapper
@dataclass(kw_only=True)
class OneDriveBackup:
"""Define a OneDrive backup."""
backup: AgentBackup
backup_file_id: str
metadata_file_id: str
class OneDriveBackupAgent(BackupAgent):
"""OneDrive backup agent."""
@ -115,24 +127,20 @@ class OneDriveBackupAgent(BackupAgent):
self.name = entry.title
assert entry.unique_id
self.unique_id = entry.unique_id
self._backup_cache: dict[str, OneDriveBackup] = {}
self._cache_expiration = time()
@handle_backup_errors
async def async_download_backup(
self, backup_id: str, **kwargs: Any
) -> AsyncIterator[bytes]:
"""Download a backup file."""
metadata_item = await self._find_item_by_backup_id(backup_id)
if (
metadata_item is None
or metadata_item.description is None
or "backup_file_id" not in metadata_item.description
):
backups = await self._list_cached_backups()
if backup_id not in backups:
raise BackupAgentError("Backup not found")
metadata_info = loads(unescape(metadata_item.description))
stream = await self._client.download_drive_item(
metadata_info["backup_file_id"], timeout=TIMEOUT
backups[backup_id].backup_file_id, timeout=TIMEOUT
)
return stream.iter_chunked(1024)
@ -181,6 +189,7 @@ class OneDriveBackupAgent(BackupAgent):
path_or_id=metadata_file.id,
data=ItemUpdate(description=dumps(metadata_description)),
)
self._cache_expiration = time()
@handle_backup_errors
async def async_delete_backup(
@ -189,28 +198,21 @@ class OneDriveBackupAgent(BackupAgent):
**kwargs: Any,
) -> None:
"""Delete a backup file."""
metadata_item = await self._find_item_by_backup_id(backup_id)
if (
metadata_item is None
or metadata_item.description is None
or "backup_file_id" not in metadata_item.description
):
backups = await self._list_cached_backups()
if backup_id not in backups:
return
metadata_info = loads(unescape(metadata_item.description))
await self._client.delete_drive_item(metadata_info["backup_file_id"])
await self._client.delete_drive_item(metadata_item.id)
backup = backups[backup_id]
await self._client.delete_drive_item(backup.backup_file_id)
await self._client.delete_drive_item(backup.metadata_file_id)
self._cache_expiration = time()
@handle_backup_errors
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List backups."""
items = await self._client.list_drive_items(self._folder_id)
return [
await self._download_backup_metadata(item.id)
for item in items
if item.description
and "backup_id" in item.description
and f'"metadata_version": {METADATA_VERSION}' in unescape(item.description)
backup.backup for backup in (await self._list_cached_backups()).values()
]
@handle_backup_errors
@ -218,27 +220,34 @@ class OneDriveBackupAgent(BackupAgent):
self, backup_id: str, **kwargs: Any
) -> AgentBackup | None:
"""Return a backup."""
metadata_file = await self._find_item_by_backup_id(backup_id)
if metadata_file is None or metadata_file.description is None:
return None
backups = await self._list_cached_backups()
return backups[backup_id].backup if backup_id in backups else None
return await self._download_backup_metadata(metadata_file.id)
async def _list_cached_backups(self) -> dict[str, OneDriveBackup]:
"""List backups with a cache."""
if time() <= self._cache_expiration:
return self._backup_cache
async def _find_item_by_backup_id(self, backup_id: str) -> File | Folder | None:
"""Find an item by backup ID."""
return next(
(
item
for item in await self._client.list_drive_items(self._folder_id)
if item.description
and backup_id in item.description
and f'"metadata_version": {METADATA_VERSION}'
in unescape(item.description)
),
None,
)
items = await self._client.list_drive_items(self._folder_id)
async def _download_backup_metadata(self, item_id: str) -> AgentBackup:
metadata_stream = await self._client.download_drive_item(item_id)
metadata_json = loads(await metadata_stream.read())
return AgentBackup.from_dict(metadata_json)
async def download_backup_metadata(item_id: str) -> AgentBackup:
metadata_stream = await self._client.download_drive_item(item_id)
metadata_json = loads(await metadata_stream.read())
return AgentBackup.from_dict(metadata_json)
backups: dict[str, OneDriveBackup] = {}
for item in items:
if item.description and f'"metadata_version": {METADATA_VERSION}' in (
metadata_description_json := unescape(item.description)
):
backup = await download_backup_metadata(item.id)
metadata_description = loads(metadata_description_json)
backups[backup.backup_id] = OneDriveBackup(
backup=backup,
backup_file_id=metadata_description["backup_file_id"],
metadata_file_id=item.id,
)
self._cache_expiration = time() + CACHE_TTL
self._backup_cache = backups
return backups