Add S3 integration (#139325)

* Add S3 integration

* Improve translations and error handling

* Test S3 integration

* Update QoS

* Add missing data_description strings

* Fix missing async_initialize_backup in tests

* PR changes

* Remove unique ID, rely on abort_entries_match

* Raise only BackupAgentError (#139754), introduce decorator for error handling

* Switch to metadata-file based solution

* PR changes

* Revert strict typing

* Bump dependency

* Silence mypy

* Pass docs URLs as description_placeholders

* PR changes

* Rename _api to api

* PR Changes

* PR Changes 2

* Remove api abstraction

* Handle S3 multipart upload size limitations

* PR changes
This commit is contained in:
Tomáš Bedřich 2025-04-25 20:16:44 +02:00 committed by GitHub
parent a057effad5
commit 6a115d0133
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 1477 additions and 0 deletions

2
CODEOWNERS generated
View File

@ -1318,6 +1318,8 @@ build.json @home-assistant/supervisor
/tests/components/ruuvitag_ble/ @akx
/homeassistant/components/rympro/ @OnFreund @elad-bar @maorcc
/tests/components/rympro/ @OnFreund @elad-bar @maorcc
/homeassistant/components/s3/ @tomasbedrich
/tests/components/s3/ @tomasbedrich
/homeassistant/components/sabnzbd/ @shaiu @jpbede
/tests/components/sabnzbd/ @shaiu @jpbede
/homeassistant/components/saj/ @fredericvl

View File

@ -0,0 +1,82 @@
"""The S3 integration."""
from __future__ import annotations
import logging
from typing import cast
from aiobotocore.client import AioBaseClient as S3Client
from aiobotocore.session import AioSession
from botocore.exceptions import ClientError, ConnectionError, ParamValidationError
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from .const import (
CONF_ACCESS_KEY_ID,
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
type S3ConfigEntry = ConfigEntry[S3Client]
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: S3ConfigEntry) -> bool:
"""Set up S3 from a config entry."""
data = cast(dict, entry.data)
try:
session = AioSession()
# pylint: disable-next=unnecessary-dunder-call
client = await session.create_client(
"s3",
endpoint_url=data.get(CONF_ENDPOINT_URL),
aws_secret_access_key=data[CONF_SECRET_ACCESS_KEY],
aws_access_key_id=data[CONF_ACCESS_KEY_ID],
).__aenter__()
await client.head_bucket(Bucket=data[CONF_BUCKET])
except ClientError as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_credentials",
) from err
except ParamValidationError as err:
if "Invalid bucket name" in str(err):
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_bucket_name",
) from err
except ValueError as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_endpoint_url",
) from err
except ConnectionError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="cannot_connect",
) from err
entry.runtime_data = client
def notify_backup_listeners() -> None:
for listener in hass.data.get(DATA_BACKUP_AGENT_LISTENERS, []):
listener()
entry.async_on_unload(entry.async_on_state_change(notify_backup_listeners))
return True
async def async_unload_entry(hass: HomeAssistant, entry: S3ConfigEntry) -> bool:
"""Unload a config entry."""
client = entry.runtime_data
await client.__aexit__(None, None, None)
return True

View File

@ -0,0 +1,330 @@
"""Backup platform for the S3 integration."""
from collections.abc import AsyncIterator, Callable, Coroutine
import functools
import json
import logging
from time import time
from typing import Any
from botocore.exceptions import BotoCoreError
from homeassistant.components.backup import (
AgentBackup,
BackupAgent,
BackupAgentError,
BackupNotFound,
suggested_filename,
)
from homeassistant.core import HomeAssistant, callback
from . import S3ConfigEntry
from .const import CONF_BUCKET, DATA_BACKUP_AGENT_LISTENERS, DOMAIN
_LOGGER = logging.getLogger(__name__)
CACHE_TTL = 300
# S3 part size requirements: 5 MiB to 5 GiB per part
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
# We set the threshold to 20 MiB to avoid too many parts.
# Note that each part is allocated in the memory.
MULTIPART_MIN_PART_SIZE_BYTES = 20 * 2**20
def handle_boto_errors[T](
func: Callable[..., Coroutine[Any, Any, T]],
) -> Callable[..., Coroutine[Any, Any, T]]:
"""Handle BotoCoreError exceptions by converting them to BackupAgentError."""
@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
"""Catch BotoCoreError and raise BackupAgentError."""
try:
return await func(*args, **kwargs)
except BotoCoreError as err:
error_msg = f"Failed during {func.__name__}"
raise BackupAgentError(error_msg) from err
return wrapper
async def async_get_backup_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Return a list of backup agents."""
entries: list[S3ConfigEntry] = hass.config_entries.async_loaded_entries(DOMAIN)
return [S3BackupAgent(hass, entry) for entry in entries]
@callback
def async_register_backup_agents_listener(
hass: HomeAssistant,
*,
listener: Callable[[], None],
**kwargs: Any,
) -> Callable[[], None]:
"""Register a listener to be called when agents are added or removed.
:return: A function to unregister the listener.
"""
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
@callback
def remove_listener() -> None:
"""Remove the listener."""
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
del hass.data[DATA_BACKUP_AGENT_LISTENERS]
return remove_listener
def suggested_filenames(backup: AgentBackup) -> tuple[str, str]:
"""Return the suggested filenames for the backup and metadata files."""
base_name = suggested_filename(backup).rsplit(".", 1)[0]
return f"{base_name}.tar", f"{base_name}.metadata.json"
class S3BackupAgent(BackupAgent):
"""Backup agent for the S3 integration."""
domain = DOMAIN
def __init__(self, hass: HomeAssistant, entry: S3ConfigEntry) -> None:
"""Initialize the S3 agent."""
super().__init__()
self._client = entry.runtime_data
self._bucket: str = entry.data[CONF_BUCKET]
self.name = entry.title
self.unique_id = entry.entry_id
self._backup_cache: dict[str, AgentBackup] = {}
self._cache_expiration = time()
@handle_boto_errors
async def async_download_backup(
self,
backup_id: str,
**kwargs: Any,
) -> AsyncIterator[bytes]:
"""Download a backup file.
:param backup_id: The ID of the backup that was returned in async_list_backups.
:return: An async iterator that yields bytes.
"""
backup = await self._find_backup_by_id(backup_id)
tar_filename, _ = suggested_filenames(backup)
response = await self._client.get_object(Bucket=self._bucket, Key=tar_filename)
return response["Body"].iter_chunks()
async def async_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
backup: AgentBackup,
**kwargs: Any,
) -> None:
"""Upload a backup.
:param open_stream: A function returning an async iterator that yields bytes.
:param backup: Metadata about the backup that should be uploaded.
"""
tar_filename, metadata_filename = suggested_filenames(backup)
try:
if backup.size < MULTIPART_MIN_PART_SIZE_BYTES:
await self._upload_simple(tar_filename, open_stream)
else:
await self._upload_multipart(tar_filename, open_stream)
# Upload the metadata file
metadata_content = json.dumps(backup.as_dict())
await self._client.put_object(
Bucket=self._bucket,
Key=metadata_filename,
Body=metadata_content,
)
except BotoCoreError as err:
raise BackupAgentError("Failed to upload backup") from err
else:
# Reset cache after successful upload
self._cache_expiration = time()
async def _upload_simple(
self,
tar_filename: str,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
) -> None:
"""Upload a small file using simple upload.
:param tar_filename: The target filename for the backup.
:param open_stream: A function returning an async iterator that yields bytes.
"""
_LOGGER.debug("Starting simple upload for %s", tar_filename)
stream = await open_stream()
file_data = bytearray()
async for chunk in stream:
file_data.extend(chunk)
await self._client.put_object(
Bucket=self._bucket,
Key=tar_filename,
Body=bytes(file_data),
)
async def _upload_multipart(
self,
tar_filename: str,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
):
"""Upload a large file using multipart upload.
:param tar_filename: The target filename for the backup.
:param open_stream: A function returning an async iterator that yields bytes.
"""
_LOGGER.debug("Starting multipart upload for %s", tar_filename)
multipart_upload = await self._client.create_multipart_upload(
Bucket=self._bucket,
Key=tar_filename,
)
upload_id = multipart_upload["UploadId"]
try:
parts = []
part_number = 1
buffer_size = 0 # bytes
buffer: list[bytes] = []
stream = await open_stream()
async for chunk in stream:
buffer_size += len(chunk)
buffer.append(chunk)
# If buffer size meets minimum part size, upload it as a part
if buffer_size >= MULTIPART_MIN_PART_SIZE_BYTES:
_LOGGER.debug(
"Uploading part number %d, size %d", part_number, buffer_size
)
part = await self._client.upload_part(
Bucket=self._bucket,
Key=tar_filename,
PartNumber=part_number,
UploadId=upload_id,
Body=b"".join(buffer),
)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
part_number += 1
buffer_size = 0
buffer = []
# Upload the final buffer as the last part (no minimum size requirement)
if buffer:
_LOGGER.debug(
"Uploading final part number %d, size %d", part_number, buffer_size
)
part = await self._client.upload_part(
Bucket=self._bucket,
Key=tar_filename,
PartNumber=part_number,
UploadId=upload_id,
Body=b"".join(buffer),
)
parts.append({"PartNumber": part_number, "ETag": part["ETag"]})
await self._client.complete_multipart_upload(
Bucket=self._bucket,
Key=tar_filename,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
except BotoCoreError:
try:
await self._client.abort_multipart_upload(
Bucket=self._bucket,
Key=tar_filename,
UploadId=upload_id,
)
except BotoCoreError:
_LOGGER.exception("Failed to abort multipart upload")
raise
@handle_boto_errors
async def async_delete_backup(
self,
backup_id: str,
**kwargs: Any,
) -> None:
"""Delete a backup file.
:param backup_id: The ID of the backup that was returned in async_list_backups.
"""
backup = await self._find_backup_by_id(backup_id)
tar_filename, metadata_filename = suggested_filenames(backup)
# Delete both the backup file and its metadata file
await self._client.delete_object(Bucket=self._bucket, Key=tar_filename)
await self._client.delete_object(Bucket=self._bucket, Key=metadata_filename)
# Reset cache after successful deletion
self._cache_expiration = time()
@handle_boto_errors
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List backups."""
backups = await self._list_backups()
return list(backups.values())
@handle_boto_errors
async def async_get_backup(
self,
backup_id: str,
**kwargs: Any,
) -> AgentBackup:
"""Return a backup."""
return await self._find_backup_by_id(backup_id)
async def _find_backup_by_id(self, backup_id: str) -> AgentBackup:
"""Find a backup by its backup ID."""
backups = await self._list_backups()
if backup := backups.get(backup_id):
return backup
raise BackupNotFound(f"Backup {backup_id} not found")
async def _list_backups(self) -> dict[str, AgentBackup]:
"""List backups, using a cache if possible."""
if time() <= self._cache_expiration:
return self._backup_cache
backups = {}
response = await self._client.list_objects_v2(Bucket=self._bucket)
# Filter for metadata files only
metadata_files = [
obj
for obj in response.get("Contents", [])
if obj["Key"].endswith(".metadata.json")
]
for metadata_file in metadata_files:
try:
# Download and parse metadata file
metadata_response = await self._client.get_object(
Bucket=self._bucket, Key=metadata_file["Key"]
)
metadata_content = await metadata_response["Body"].read()
metadata_json = json.loads(metadata_content)
except (BotoCoreError, json.JSONDecodeError) as err:
_LOGGER.warning(
"Failed to process metadata file %s: %s",
metadata_file["Key"],
err,
)
continue
backup = AgentBackup.from_dict(metadata_json)
backups[backup.backup_id] = backup
self._backup_cache = backups
self._cache_expiration = time() + CACHE_TTL
return self._backup_cache

View File

@ -0,0 +1,93 @@
"""Config flow for the S3 integration."""
from __future__ import annotations
from typing import Any
from aiobotocore.session import AioSession
from botocore.exceptions import ClientError, ConnectionError, ParamValidationError
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import (
CONF_ACCESS_KEY_ID,
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
DEFAULT_ENDPOINT_URL,
DESCRIPTION_AWS_S3_DOCS_URL,
DESCRIPTION_BOTO3_DOCS_URL,
DOMAIN,
)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_ACCESS_KEY_ID): cv.string,
vol.Required(CONF_SECRET_ACCESS_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
vol.Required(CONF_BUCKET): cv.string,
vol.Required(CONF_ENDPOINT_URL, default=DEFAULT_ENDPOINT_URL): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.URL)
),
}
)
class S3ConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow."""
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors: dict[str, str] = {}
if user_input is not None:
self._async_abort_entries_match(
{
CONF_BUCKET: user_input[CONF_BUCKET],
CONF_ENDPOINT_URL: user_input[CONF_ENDPOINT_URL],
}
)
try:
session = AioSession()
async with session.create_client(
"s3",
endpoint_url=user_input.get(CONF_ENDPOINT_URL),
aws_secret_access_key=user_input[CONF_SECRET_ACCESS_KEY],
aws_access_key_id=user_input[CONF_ACCESS_KEY_ID],
) as client:
await client.head_bucket(Bucket=user_input[CONF_BUCKET])
except ClientError:
errors["base"] = "invalid_credentials"
except ParamValidationError as err:
if "Invalid bucket name" in str(err):
errors[CONF_BUCKET] = "invalid_bucket_name"
except ValueError:
errors[CONF_ENDPOINT_URL] = "invalid_endpoint_url"
except ConnectionError:
errors[CONF_ENDPOINT_URL] = "cannot_connect"
else:
return self.async_create_entry(
title=user_input[CONF_BUCKET], data=user_input
)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
description_placeholders={
"aws_s3_docs_url": DESCRIPTION_AWS_S3_DOCS_URL,
"boto3_docs_url": DESCRIPTION_BOTO3_DOCS_URL,
},
)

View File

@ -0,0 +1,22 @@
"""Constants for the S3 integration."""
from collections.abc import Callable
from typing import Final
from homeassistant.util.hass_dict import HassKey
DOMAIN: Final = "s3"
CONF_ACCESS_KEY_ID = "access_key_id"
CONF_SECRET_ACCESS_KEY = "secret_access_key"
CONF_ENDPOINT_URL = "endpoint_url"
CONF_BUCKET = "bucket"
DEFAULT_ENDPOINT_URL = "https://s3.eu-central-1.amazonaws.com/"
DATA_BACKUP_AGENT_LISTENERS: HassKey[list[Callable[[], None]]] = HassKey(
f"{DOMAIN}.backup_agent_listeners"
)
DESCRIPTION_AWS_S3_DOCS_URL = "https://docs.aws.amazon.com/general/latest/gr/s3.html"
DESCRIPTION_BOTO3_DOCS_URL = "https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html"

View File

@ -0,0 +1,12 @@
{
"domain": "s3",
"name": "S3",
"codeowners": ["@tomasbedrich"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/s3",
"integration_type": "service",
"iot_class": "cloud_push",
"loggers": ["aiobotocore"],
"quality_scale": "bronze",
"requirements": ["aiobotocore==2.21.1"]
}

View File

@ -0,0 +1,112 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions.
appropriate-polling:
status: exempt
comment: This integration does not poll.
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: This integration does not have any custom actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Entities of this integration does not explicitly subscribe to events.
entity-unique-id:
status: exempt
comment: This integration does not have entities.
has-entity-name:
status: exempt
comment: This integration does not have entities.
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not register custom actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: This integration does not have an options flow.
docs-installation-parameters: done
entity-unavailable:
status: exempt
comment: This integration does not have entities.
integration-owner: done
log-when-unavailable: todo
parallel-updates:
status: exempt
comment: This integration does not poll.
reauthentication-flow: todo
test-coverage: done
# Gold
devices:
status: exempt
comment: This integration does not have entities.
diagnostics: todo
discovery-update-info:
status: exempt
comment: S3 is a cloud service that is not discovered on the network.
discovery:
status: exempt
comment: S3 is a cloud service that is not discovered on the network.
docs-data-update:
status: exempt
comment: This integration does not poll.
docs-examples:
status: exempt
comment: The integration extends core functionality and does not require examples.
docs-known-limitations:
status: exempt
comment: No known limitations.
docs-supported-devices:
status: exempt
comment: This integration does not support physical devices.
docs-supported-functions: done
docs-troubleshooting:
status: exempt
comment: There are no more detailed troubleshooting instructions available than what is already included in strings.json.
docs-use-cases: done
dynamic-devices:
status: exempt
comment: This integration does not have devices.
entity-category:
status: exempt
comment: This integration does not have entities.
entity-device-class:
status: exempt
comment: This integration does not have entities.
entity-disabled-by-default:
status: exempt
comment: This integration does not have entities.
entity-translations:
status: exempt
comment: This integration does not have entities.
exception-translations: done
icon-translations:
status: exempt
comment: This integration does not use icons.
reconfiguration-flow: todo
repair-issues:
status: exempt
comment: There are no issues which can be repaired.
stale-devices:
status: exempt
comment: This integration does not have devices.
# Platinum
async-dependency: done
inject-websession: todo
strict-typing: todo

View File

@ -0,0 +1,41 @@
{
"config": {
"step": {
"user": {
"data": {
"access_key_id": "Access key ID",
"secret_access_key": "Secret access key",
"bucket": "Bucket name",
"endpoint_url": "Endpoint URL"
},
"data_description": {
"access_key_id": "Access key ID to connect to S3 API",
"secret_access_key": "Secret access key to connect to S3 API",
"bucket": "Bucket must already exist and be writable by the provided credentials.",
"endpoint_url": "Endpoint URL provided to [Boto3 Session]({boto3_docs_url}). Region-specific [AWS S3 endpoints]({aws_s3_docs_url}) are available in their docs."
},
"title": "Add S3 bucket"
}
},
"error": {
"cannot_connect": "[%key:component::s3::exceptions::cannot_connect::message%]",
"invalid_bucket_name": "[%key:component::s3::exceptions::invalid_bucket_name::message%]",
"invalid_credentials": "[%key:component::s3::exceptions::invalid_credentials::message%]",
"invalid_endpoint_url": "Invalid endpoint URL"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
}
},
"exceptions": {
"cannot_connect": {
"message": "Cannot connect to endpoint"
},
"invalid_bucket_name": {
"message": "Invalid bucket name"
},
"invalid_credentials": {
"message": "Bucket cannot be accessed using provided combination of access key ID and secret access key."
}
}
}

View File

@ -541,6 +541,7 @@ FLOWS = {
"ruuvi_gateway",
"ruuvitag_ble",
"rympro",
"s3",
"sabnzbd",
"samsungtv",
"sanix",

View File

@ -5603,6 +5603,12 @@
"config_flow": true,
"iot_class": "cloud_polling"
},
"s3": {
"name": "S3",
"integration_type": "service",
"config_flow": true,
"iot_class": "cloud_push"
},
"sabnzbd": {
"name": "SABnzbd",
"integration_type": "hub",

1
requirements_all.txt generated
View File

@ -210,6 +210,7 @@ aioazuredevops==2.2.1
aiobafi6==0.9.0
# homeassistant.components.aws
# homeassistant.components.s3
aiobotocore==2.21.1
# homeassistant.components.comelit

View File

@ -198,6 +198,7 @@ aioazuredevops==2.2.1
aiobafi6==0.9.0
# homeassistant.components.aws
# homeassistant.components.s3
aiobotocore==2.21.1
# homeassistant.components.comelit

View File

@ -0,0 +1,14 @@
"""Tests for the S3 integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(
hass: HomeAssistant, mock_config_entry: MockConfigEntry
) -> None:
"""Set up the S3 integration for testing."""
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()

View File

@ -0,0 +1,82 @@
"""Common fixtures for the S3 tests."""
from collections.abc import AsyncIterator, Generator
import json
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.backup import AgentBackup
from homeassistant.components.s3.backup import (
MULTIPART_MIN_PART_SIZE_BYTES,
suggested_filenames,
)
from homeassistant.components.s3.const import DOMAIN
from .const import USER_INPUT
from tests.common import MockConfigEntry
@pytest.fixture(
params=[2**20, MULTIPART_MIN_PART_SIZE_BYTES],
ids=["small", "large"],
)
def test_backup(request: pytest.FixtureRequest) -> None:
"""Test backup fixture."""
return AgentBackup(
addons=[],
backup_id="23e64aec",
date="2024-11-22T11:48:48.727189+01:00",
database_included=True,
extra_metadata={},
folders=[],
homeassistant_included=True,
homeassistant_version="2024.12.0.dev0",
name="Core 2024.12.0.dev0",
protected=False,
size=request.param,
)
@pytest.fixture(autouse=True)
def mock_client(test_backup: AgentBackup) -> Generator[AsyncMock]:
"""Mock the S3 client."""
with patch(
"aiobotocore.session.AioSession.create_client",
autospec=True,
return_value=AsyncMock(),
) as create_client:
client = create_client.return_value
tar_file, metadata_file = suggested_filenames(test_backup)
client.list_objects_v2.return_value = {
"Contents": [{"Key": tar_file}, {"Key": metadata_file}]
}
client.create_multipart_upload.return_value = {"UploadId": "upload_id"}
client.upload_part.return_value = {"ETag": "etag"}
# to simplify this mock, we assume that backup is always "iterated" over, while metadata is always "read" as a whole
class MockStream:
async def iter_chunks(self) -> AsyncIterator[bytes]:
yield b"backup data"
async def read(self) -> bytes:
return json.dumps(test_backup.as_dict()).encode()
client.get_object.return_value = {"Body": MockStream()}
client.head_bucket.return_value = {}
create_client.return_value.__aenter__.return_value = client
yield client
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Return the default mocked config entry."""
return MockConfigEntry(
entry_id="test",
title="test",
domain=DOMAIN,
data=USER_INPUT,
)

View File

@ -0,0 +1,15 @@
"""Consts for S3 tests."""
from homeassistant.components.s3.const import (
CONF_ACCESS_KEY_ID,
CONF_BUCKET,
CONF_ENDPOINT_URL,
CONF_SECRET_ACCESS_KEY,
)
USER_INPUT = {
CONF_ACCESS_KEY_ID: "TestTestTestTestTest",
CONF_SECRET_ACCESS_KEY: "TestTestTestTestTestTestTestTestTestTest",
CONF_ENDPOINT_URL: "http://127.0.0.1:9000",
CONF_BUCKET: "test",
}

View File

@ -0,0 +1,470 @@
"""Test the S3 backup platform."""
from collections.abc import AsyncGenerator
from io import StringIO
import json
from time import time
from unittest.mock import AsyncMock, Mock, patch
from botocore.exceptions import ConnectTimeoutError
import pytest
from homeassistant.components.backup import DOMAIN as BACKUP_DOMAIN, AgentBackup
from homeassistant.components.s3.backup import (
MULTIPART_MIN_PART_SIZE_BYTES,
BotoCoreError,
S3BackupAgent,
async_register_backup_agents_listener,
suggested_filenames,
)
from homeassistant.components.s3.const import (
CONF_ENDPOINT_URL,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.backup import async_initialize_backup
from homeassistant.setup import async_setup_component
from . import setup_integration
from .const import USER_INPUT
from tests.common import MockConfigEntry
from tests.typing import ClientSessionGenerator, MagicMock, WebSocketGenerator
@pytest.fixture(autouse=True)
async def setup_backup_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> AsyncGenerator[None]:
"""Set up S3 integration."""
with (
patch("homeassistant.components.backup.is_hassio", return_value=False),
patch("homeassistant.components.backup.store.STORE_DELAY_SAVE", 0),
):
async_initialize_backup(hass)
assert await async_setup_component(hass, BACKUP_DOMAIN, {})
await setup_integration(hass, mock_config_entry)
await hass.async_block_till_done()
yield
async def test_suggested_filenames() -> None:
"""Test the suggested_filenames function."""
backup = AgentBackup(
backup_id="a1b2c3",
date="2021-01-01T01:02:03+00:00",
addons=[],
database_included=False,
extra_metadata={},
folders=[],
homeassistant_included=False,
homeassistant_version=None,
name="my_pretty_backup",
protected=False,
size=0,
)
tar_filename, metadata_filename = suggested_filenames(backup)
assert tar_filename == "my_pretty_backup_2021-01-01_01.02_03000000.tar"
assert (
metadata_filename == "my_pretty_backup_2021-01-01_01.02_03000000.metadata.json"
)
async def test_agents_info(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test backup agent info."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/agents/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agents": [
{"agent_id": "backup.local", "name": "local"},
{
"agent_id": f"{DOMAIN}.{mock_config_entry.entry_id}",
"name": mock_config_entry.title,
},
],
}
async def test_agents_list_backups(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
test_backup: AgentBackup,
) -> None:
"""Test agent list backups."""
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backups"] == [
{
"addons": test_backup.addons,
"backup_id": test_backup.backup_id,
"date": test_backup.date,
"database_included": test_backup.database_included,
"folders": test_backup.folders,
"homeassistant_included": test_backup.homeassistant_included,
"homeassistant_version": test_backup.homeassistant_version,
"name": test_backup.name,
"extra_metadata": test_backup.extra_metadata,
"agents": {
f"{DOMAIN}.{mock_config_entry.entry_id}": {
"protected": test_backup.protected,
"size": test_backup.size,
}
},
"failed_agent_ids": [],
"with_automatic_settings": None,
}
]
async def test_agents_get_backup(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_config_entry: MockConfigEntry,
test_backup: AgentBackup,
) -> None:
"""Test agent get backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{"type": "backup/details", "backup_id": test_backup.backup_id}
)
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backup"] == {
"addons": test_backup.addons,
"backup_id": test_backup.backup_id,
"date": test_backup.date,
"database_included": test_backup.database_included,
"folders": test_backup.folders,
"homeassistant_included": test_backup.homeassistant_included,
"homeassistant_version": test_backup.homeassistant_version,
"name": test_backup.name,
"extra_metadata": test_backup.extra_metadata,
"agents": {
f"{DOMAIN}.{mock_config_entry.entry_id}": {
"protected": test_backup.protected,
"size": test_backup.size,
}
},
"failed_agent_ids": [],
"with_automatic_settings": None,
}
async def test_agents_get_backup_does_not_throw_on_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
) -> None:
"""Test agent get backup does not throw on a backup not found."""
mock_client.list_objects_v2.return_value = {"Contents": []}
client = await hass_ws_client(hass)
await client.send_json_auto_id({"type": "backup/details", "backup_id": "random"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["agent_errors"] == {}
assert response["result"]["backup"] is None
async def test_agents_list_backups_with_corrupted_metadata(
hass: HomeAssistant,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
caplog: pytest.LogCaptureFixture,
test_backup: AgentBackup,
) -> None:
"""Test listing backups when one metadata file is corrupted."""
# Create agent
agent = S3BackupAgent(hass, mock_config_entry)
# Set up mock responses for both valid and corrupted metadata files
mock_client.list_objects_v2.return_value = {
"Contents": [
{
"Key": "valid_backup.metadata.json",
"LastModified": "2023-01-01T00:00:00+00:00",
},
{
"Key": "corrupted_backup.metadata.json",
"LastModified": "2023-01-01T00:00:00+00:00",
},
]
}
# Mock responses for get_object calls
valid_metadata = json.dumps(test_backup.as_dict())
corrupted_metadata = "{invalid json content"
async def mock_get_object(**kwargs):
"""Mock get_object with different responses based on the key."""
key = kwargs.get("Key", "")
if "valid_backup" in key:
mock_body = AsyncMock()
mock_body.read.return_value = valid_metadata.encode()
return {"Body": mock_body}
# Corrupted metadata
mock_body = AsyncMock()
mock_body.read.return_value = corrupted_metadata.encode()
return {"Body": mock_body}
mock_client.get_object.side_effect = mock_get_object
backups = await agent.async_list_backups()
assert len(backups) == 1
assert backups[0].backup_id == test_backup.backup_id
assert "Failed to process metadata file" in caplog.text
async def test_agents_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
) -> None:
"""Test agent delete backup."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": "23e64aec",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
# Should delete both the tar and the metadata file
assert mock_client.delete_object.call_count == 2
async def test_agents_delete_not_throwing_on_not_found(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
) -> None:
"""Test agent delete backup does not throw on a backup not found."""
mock_client.list_objects_v2.return_value = {"Contents": []}
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": "random",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {"agent_errors": {}}
assert mock_client.delete_object.call_count == 0
async def test_agents_upload(
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
test_backup: AgentBackup,
) -> None:
"""Test agent upload backup."""
client = await hass_client()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=test_backup,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
# we must emit at least two chunks
# the "appendix" chunk triggers the upload of the final buffer part
mocked_open.return_value.read = Mock(
side_effect=[
b"a" * test_backup.size,
b"appendix",
b"",
]
)
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert f"Uploading backup {test_backup.backup_id}" in caplog.text
if test_backup.size < MULTIPART_MIN_PART_SIZE_BYTES:
# single part + metadata both as regular upload (no multiparts)
assert mock_client.create_multipart_upload.await_count == 0
assert mock_client.put_object.await_count == 2
else:
assert "Uploading final part" in caplog.text
# 2 parts as multipart + metadata as regular upload
assert mock_client.create_multipart_upload.await_count == 1
assert mock_client.upload_part.await_count == 2
assert mock_client.complete_multipart_upload.await_count == 1
assert mock_client.put_object.await_count == 1
async def test_agents_upload_network_failure(
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
test_backup: AgentBackup,
) -> None:
"""Test agent upload backup with network failure."""
client = await hass_client()
with (
patch(
"homeassistant.components.backup.manager.BackupManager.async_get_backup",
return_value=test_backup,
),
patch(
"homeassistant.components.backup.manager.read_backup",
return_value=test_backup,
),
patch("pathlib.Path.open") as mocked_open,
):
mocked_open.return_value.read = Mock(side_effect=[b"test", b""])
# simulate network failure
mock_client.put_object.side_effect = mock_client.upload_part.side_effect = (
mock_client.abort_multipart_upload.side_effect
) = ConnectTimeoutError(endpoint_url=USER_INPUT[CONF_ENDPOINT_URL])
resp = await client.post(
f"/api/backup/upload?agent_id={DOMAIN}.{mock_config_entry.entry_id}",
data={"file": StringIO("test")},
)
assert resp.status == 201
assert "Upload failed for s3" in caplog.text
async def test_agents_download(
hass_client: ClientSessionGenerator,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test agent download backup."""
client = await hass_client()
backup_id = "23e64aec"
resp = await client.get(
f"/api/backup/download/{backup_id}?agent_id={DOMAIN}.{mock_config_entry.entry_id}"
)
assert resp.status == 200
assert await resp.content.read() == b"backup data"
assert mock_client.get_object.call_count == 2 # One for metadata, one for tar file
async def test_error_during_delete(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
mock_client: MagicMock,
mock_config_entry: MockConfigEntry,
test_backup: AgentBackup,
) -> None:
"""Test the error wrapper."""
mock_client.delete_object.side_effect = BotoCoreError
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "backup/delete",
"backup_id": test_backup.backup_id,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"agent_errors": {
f"{DOMAIN}.{mock_config_entry.entry_id}": "Failed during async_delete_backup"
}
}
async def test_cache_expiration(
hass: HomeAssistant,
mock_client: MagicMock,
test_backup: AgentBackup,
) -> None:
"""Test that the cache expires correctly."""
# Mock the entry
mock_entry = MockConfigEntry(
domain=DOMAIN,
data={"bucket": "test-bucket"},
unique_id="test-unique-id",
title="Test S3",
)
mock_entry.runtime_data = mock_client
# Create agent
agent = S3BackupAgent(hass, mock_entry)
# Mock metadata response
metadata_content = json.dumps(test_backup.as_dict())
mock_body = AsyncMock()
mock_body.read.return_value = metadata_content.encode()
mock_client.list_objects_v2.return_value = {
"Contents": [
{"Key": "test.metadata.json", "LastModified": "2023-01-01T00:00:00+00:00"}
]
}
# First call should query S3
await agent.async_list_backups()
assert mock_client.list_objects_v2.call_count == 1
assert mock_client.get_object.call_count == 1
# Second call should use cache
await agent.async_list_backups()
assert mock_client.list_objects_v2.call_count == 1
assert mock_client.get_object.call_count == 1
# Set cache to expire
agent._cache_expiration = time() - 1
# Third call should query S3 again
await agent.async_list_backups()
assert mock_client.list_objects_v2.call_count == 2
assert mock_client.get_object.call_count == 2
async def test_listeners_get_cleaned_up(hass: HomeAssistant) -> None:
"""Test listener gets cleaned up."""
listener = MagicMock()
remove_listener = async_register_backup_agents_listener(hass, listener=listener)
hass.data[DATA_BACKUP_AGENT_LISTENERS] = [
listener
] # make sure it's the last listener
remove_listener()
assert DATA_BACKUP_AGENT_LISTENERS not in hass.data

View File

@ -0,0 +1,118 @@
"""Test the S3 config flow."""
from unittest.mock import AsyncMock, patch
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
ParamValidationError,
)
import pytest
from homeassistant import config_entries
from homeassistant.components.s3.const import CONF_BUCKET, CONF_ENDPOINT_URL, DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from .const import USER_INPUT
from tests.common import MockConfigEntry
async def _async_start_flow(
hass: HomeAssistant,
) -> FlowResultType:
"""Initialize the config flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
return await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
async def test_flow(hass: HomeAssistant) -> None:
"""Test config flow."""
result = await _async_start_flow(hass)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test"
assert result["data"] == USER_INPUT
@pytest.mark.parametrize(
("exception", "errors"),
[
(
ParamValidationError(report="Invalid bucket name"),
{CONF_BUCKET: "invalid_bucket_name"},
),
(ValueError(), {CONF_ENDPOINT_URL: "invalid_endpoint_url"}),
(
EndpointConnectionError(endpoint_url="http://example.com"),
{CONF_ENDPOINT_URL: "cannot_connect"},
),
],
)
async def test_flow_create_client_errors(
hass: HomeAssistant,
exception: Exception,
errors: dict[str, str],
) -> None:
"""Test config flow errors."""
with patch(
"aiobotocore.session.AioSession.create_client",
side_effect=exception,
):
result = await _async_start_flow(hass)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == errors
# Fix and finish the test
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test"
assert result["data"] == USER_INPUT
async def test_flow_head_bucket_error(
hass: HomeAssistant,
mock_client: AsyncMock,
) -> None:
"""Test setup_entry error when calling head_bucket."""
mock_client.head_bucket.side_effect = ClientError(
error_response={"Error": {"Code": "InvalidAccessKeyId"}},
operation_name="head_bucket",
)
result = await _async_start_flow(hass)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "invalid_credentials"}
# Fix and finish the test
mock_client.head_bucket.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "test"
assert result["data"] == USER_INPUT
async def test_abort_if_already_configured(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test we abort if the account is already configured."""
mock_config_entry.add_to_hass(hass)
result = await _async_start_flow(hass)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"

View File

@ -0,0 +1,75 @@
"""Test the s3 storage integration."""
from unittest.mock import AsyncMock, patch
from botocore.exceptions import (
ClientError,
EndpointConnectionError,
ParamValidationError,
)
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import setup_integration
from tests.common import MockConfigEntry
async def test_load_unload_config_entry(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test loading and unloading the integration."""
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.LOADED
await hass.config_entries.async_unload(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.NOT_LOADED
@pytest.mark.parametrize(
("exception", "state"),
[
(
ParamValidationError(report="Invalid bucket name"),
ConfigEntryState.SETUP_ERROR,
),
(ValueError(), ConfigEntryState.SETUP_ERROR),
(
EndpointConnectionError(endpoint_url="https://example.com"),
ConfigEntryState.SETUP_RETRY,
),
],
)
async def test_setup_entry_create_client_errors(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
exception: Exception,
state: ConfigEntryState,
) -> None:
"""Test various setup errors."""
with patch(
"aiobotocore.session.AioSession.create_client",
side_effect=exception,
):
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is state
async def test_setup_entry_head_bucket_error(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_client: AsyncMock,
) -> None:
"""Test setup_entry error when calling head_bucket."""
mock_client.head_bucket.side_effect = ClientError(
error_response={"Error": {"Code": "InvalidAccessKeyId"}},
operation_name="head_bucket",
)
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR