Implement upload retry logic in CloudBackupAgent (#135062)

* Implement upload retry logic in CloudBackupAgent

* Update backup.py

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* nit

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
Bram Kragten 2025-01-09 22:23:53 +01:00
parent 7f3f550b7b
commit b8b7daff5a
2 changed files with 77 additions and 27 deletions

View File

@ -2,10 +2,12 @@
from __future__ import annotations
import asyncio
import base64
from collections.abc import AsyncIterator, Callable, Coroutine, Mapping
import hashlib
import logging
import random
from typing import Any, Self
from aiohttp import ClientError, ClientTimeout, StreamReader
@ -26,6 +28,9 @@ from .const import DATA_CLOUD, DOMAIN, EVENT_CLOUD_EVENT
_LOGGER = logging.getLogger(__name__)
_STORAGE_BACKUP = "backup"
_RETRY_LIMIT = 5
_RETRY_SECONDS_MIN = 60
_RETRY_SECONDS_MAX = 600
async def _b64md5(stream: AsyncIterator[bytes]) -> str:
@ -149,6 +154,44 @@ class CloudBackupAgent(BackupAgent):
return ChunkAsyncStreamIterator(resp.content)
async def _async_do_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
filename: str,
base64md5hash: str,
metadata: dict[str, Any],
size: int,
) -> None:
"""Upload a backup."""
try:
details = await async_files_upload_details(
self._cloud,
storage_type=_STORAGE_BACKUP,
filename=filename,
metadata=metadata,
size=size,
base64md5hash=base64md5hash,
)
except (ClientError, CloudError) as err:
raise BackupAgentError("Failed to get upload details") from err
try:
upload_status = await self._cloud.websession.put(
details["url"],
data=await open_stream(),
headers=details["headers"] | {"content-length": str(size)},
timeout=ClientTimeout(connect=10.0, total=43200.0), # 43200s == 12h
)
_LOGGER.log(
logging.DEBUG if upload_status.status < 400 else logging.WARNING,
"Backup upload status: %s",
upload_status.status,
)
upload_status.raise_for_status()
except (TimeoutError, ClientError) as err:
raise BackupAgentError("Failed to upload backup") from err
async def async_upload_backup(
self,
*,
@ -165,34 +208,34 @@ class CloudBackupAgent(BackupAgent):
raise BackupAgentError("Cloud backups must be protected")
base64md5hash = await _b64md5(await open_stream())
filename = self._get_backup_filename()
metadata = backup.as_dict()
size = backup.size
try:
details = await async_files_upload_details(
self._cloud,
storage_type=_STORAGE_BACKUP,
filename=self._get_backup_filename(),
metadata=backup.as_dict(),
size=backup.size,
base64md5hash=base64md5hash,
)
except (ClientError, CloudError) as err:
raise BackupAgentError("Failed to get upload details") from err
try:
upload_status = await self._cloud.websession.put(
details["url"],
data=await open_stream(),
headers=details["headers"] | {"content-length": str(backup.size)},
timeout=ClientTimeout(connect=10.0, total=43200.0), # 43200s == 12h
)
_LOGGER.log(
logging.DEBUG if upload_status.status < 400 else logging.WARNING,
"Backup upload status: %s",
upload_status.status,
)
upload_status.raise_for_status()
except (TimeoutError, ClientError) as err:
raise BackupAgentError("Failed to upload backup") from err
tries = 1
while tries <= _RETRY_LIMIT:
try:
await self._async_do_upload_backup(
open_stream=open_stream,
filename=filename,
base64md5hash=base64md5hash,
metadata=metadata,
size=size,
)
break
except BackupAgentError as err:
if tries == _RETRY_LIMIT:
raise
tries += 1
retry_timer = random.randint(_RETRY_SECONDS_MIN, _RETRY_SECONDS_MAX)
_LOGGER.info(
"Failed to upload backup, retrying (%s/%s) in %ss: %s",
tries,
_RETRY_LIMIT,
retry_timer,
err,
)
await asyncio.sleep(retry_timer)
async def async_delete_backup(
self,

View File

@ -389,6 +389,7 @@ async def test_agents_upload_fail_put(
aioclient_mock: AiohttpClientMocker,
mock_get_upload_details: Mock,
put_mock_kwargs: dict[str, Any],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test agent upload backup fails."""
client = await hass_client()
@ -417,6 +418,9 @@ async def test_agents_upload_fail_put(
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
patch("homeassistant.components.cloud.backup.asyncio.sleep"),
patch("homeassistant.components.cloud.backup.random.randint", return_value=60),
patch("homeassistant.components.cloud.backup._RETRY_LIMIT", 2),
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
fetch_backup.return_value = test_backup
@ -426,6 +430,8 @@ async def test_agents_upload_fail_put(
)
await hass.async_block_till_done()
assert len(aioclient_mock.mock_calls) == 2
assert "Failed to upload backup, retrying (2/2) in 60s" in caplog.text
assert resp.status == 201
store_backups = hass_storage[BACKUP_DOMAIN]["data"]["backups"]
assert len(store_backups) == 1
@ -469,6 +475,7 @@ async def test_agents_upload_fail_cloud(
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
patch("homeassistant.components.cloud.backup.asyncio.sleep"),
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
fetch_backup.return_value = test_backup