Files
core/homeassistant/components/backblaze_b2/config_flow.py
ElCruncharino dcb2087f4b Add backblaze b2 backup integration (#149627)
Co-authored-by: Hugo van Rijswijk <git@hugovr.nl>
Co-authored-by: ElCruncharino <ElCruncharino@users.noreply.github.com>
Co-authored-by: Erik Montnemery <erik@montnemery.com>
2025-10-30 08:42:02 +01:00

289 lines
10 KiB
Python

"""Config flow for the Backblaze B2 integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Any
from b2sdk.v2 import B2Api, InMemoryAccountInfo, exception
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import (
BACKBLAZE_REALM,
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
CONF_PREFIX,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
# Constants
REQUIRED_CAPABILITIES = {"writeFiles", "listFiles", "deleteFiles", "readFiles"}
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_KEY_ID): cv.string,
vol.Required(CONF_APPLICATION_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
vol.Required(CONF_BUCKET): cv.string,
vol.Optional(CONF_PREFIX, default=""): cv.string,
}
)
class BackblazeConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Backblaze B2."""
VERSION = 1
reauth_entry: ConfigEntry[Any] | None
def _abort_if_duplicate_credentials(self, user_input: dict[str, Any]) -> None:
"""Abort if credentials already exist in another entry."""
self._async_abort_entries_match(
{
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
}
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initiated by the user."""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
errors, placeholders = await self._async_validate_backblaze_connection(
user_input
)
if not errors:
if user_input[CONF_PREFIX] and not user_input[CONF_PREFIX].endswith(
"/"
):
user_input[CONF_PREFIX] += "/"
return self.async_create_entry(
title=user_input[CONF_BUCKET], data=user_input
)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors,
description_placeholders={"brand_name": "Backblaze B2", **placeholders},
)
async def _async_validate_backblaze_connection(
self, user_input: dict[str, Any]
) -> tuple[dict[str, str], dict[str, str]]:
"""Validate Backblaze B2 credentials, bucket, capabilities, and prefix.
Returns a tuple of (errors_dict, placeholders_dict).
"""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
info = InMemoryAccountInfo()
b2_api = B2Api(info)
def _authorize_and_get_bucket_sync() -> None:
"""Synchronously authorize the account and get the bucket by name.
This function is run in the executor because b2sdk operations are blocking.
"""
b2_api.authorize_account(
BACKBLAZE_REALM, # Use the defined realm constant
user_input[CONF_KEY_ID],
user_input[CONF_APPLICATION_KEY],
)
b2_api.get_bucket_by_name(user_input[CONF_BUCKET])
try:
await self.hass.async_add_executor_job(_authorize_and_get_bucket_sync)
allowed = b2_api.account_info.get_allowed()
# Check if allowed info is available
if allowed is None or not allowed.get("capabilities"):
errors["base"] = "invalid_capability"
placeholders["missing_capabilities"] = ", ".join(
sorted(REQUIRED_CAPABILITIES)
)
else:
# Check if all required capabilities are present
current_caps = set(allowed["capabilities"])
if not REQUIRED_CAPABILITIES.issubset(current_caps):
missing_caps = REQUIRED_CAPABILITIES - current_caps
_LOGGER.warning(
"Missing required Backblaze B2 capabilities for Key ID '%s': %s",
user_input[CONF_KEY_ID],
", ".join(sorted(missing_caps)),
)
errors["base"] = "invalid_capability"
placeholders["missing_capabilities"] = ", ".join(
sorted(missing_caps)
)
else:
# Only check prefix if capabilities are valid
configured_prefix: str = user_input[CONF_PREFIX]
allowed_prefix = allowed.get("namePrefix") or ""
# Ensure configured prefix starts with Backblaze B2's allowed prefix
if allowed_prefix and not configured_prefix.startswith(
allowed_prefix
):
errors[CONF_PREFIX] = "invalid_prefix"
placeholders["allowed_prefix"] = allowed_prefix
except exception.Unauthorized:
_LOGGER.debug(
"Backblaze B2 authentication failed for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "invalid_credentials"
except exception.RestrictedBucket as err:
_LOGGER.debug(
"Access to Backblaze B2 bucket '%s' is restricted: %s",
user_input[CONF_BUCKET],
err,
)
placeholders["restricted_bucket_name"] = err.bucket_name
errors[CONF_BUCKET] = "restricted_bucket"
except exception.NonExistentBucket:
_LOGGER.debug(
"Backblaze B2 bucket '%s' does not exist", user_input[CONF_BUCKET]
)
errors[CONF_BUCKET] = "invalid_bucket_name"
except exception.ConnectionReset:
_LOGGER.error("Failed to connect to Backblaze B2. Connection reset")
errors["base"] = "cannot_connect"
except exception.MissingAccountData:
# This generally indicates an issue with how InMemoryAccountInfo is used
_LOGGER.error(
"Missing account data during Backblaze B2 authorization for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "invalid_credentials"
except Exception:
_LOGGER.exception(
"An unexpected error occurred during Backblaze B2 configuration for Key ID '%s'",
user_input[CONF_KEY_ID],
)
errors["base"] = "unknown"
return errors, placeholders
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Handle reauthentication flow."""
self.reauth_entry = self.hass.config_entries.async_get_entry(
self.context["entry_id"]
)
assert self.reauth_entry is not None
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication."""
assert self.reauth_entry is not None
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
validation_input = {
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
CONF_BUCKET: self.reauth_entry.data[CONF_BUCKET],
CONF_PREFIX: self.reauth_entry.data[CONF_PREFIX],
}
errors, placeholders = await self._async_validate_backblaze_connection(
validation_input
)
if not errors:
return self.async_update_reload_and_abort(
self.reauth_entry,
data_updates={
CONF_KEY_ID: user_input[CONF_KEY_ID],
CONF_APPLICATION_KEY: user_input[CONF_APPLICATION_KEY],
},
)
return self.async_show_form(
step_id="reauth_confirm",
data_schema=vol.Schema(
{
vol.Required(CONF_KEY_ID): cv.string,
vol.Required(CONF_APPLICATION_KEY): TextSelector(
config=TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
}
),
errors=errors,
description_placeholders={
"brand_name": "Backblaze B2",
"bucket": self.reauth_entry.data[CONF_BUCKET],
**placeholders,
},
)
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reconfiguration flow."""
entry = self.hass.config_entries.async_get_entry(self.context["entry_id"])
assert entry is not None
if user_input is not None:
self._abort_if_duplicate_credentials(user_input)
errors, placeholders = await self._async_validate_backblaze_connection(
user_input
)
if not errors:
if user_input[CONF_PREFIX] and not user_input[CONF_PREFIX].endswith(
"/"
):
user_input[CONF_PREFIX] += "/"
return self.async_update_reload_and_abort(
entry,
data_updates=user_input,
)
else:
errors = {}
placeholders = {}
return self.async_show_form(
step_id="reconfigure",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input or entry.data
),
errors=errors,
description_placeholders={"brand_name": "Backblaze B2", **placeholders},
)