Files
core/homeassistant/components/zha/config_flow.py
2025-11-05 18:29:27 +00:00

1166 lines
45 KiB
Python

"""Config flow for ZHA."""
from __future__ import annotations
from abc import abstractmethod
import asyncio
import collections
from contextlib import suppress
from enum import StrEnum
import json
import os
from typing import Any
import voluptuous as vol
from zha.application.const import RadioType
import zigpy.backups
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH, CONF_NWK_TX_POWER
from zigpy.exceptions import CannotWriteNetworkSettings, DestructiveWriteNetworkSettings
from homeassistant.components import onboarding, usb
from homeassistant.components.file_upload import process_uploaded_file
from homeassistant.components.hassio import AddonError, AddonState
from homeassistant.components.homeassistant_hardware import silabs_multiprotocol_addon
from homeassistant.components.homeassistant_hardware.firmware_config_flow import (
ZigbeeFlowStrategy,
)
from homeassistant.components.homeassistant_yellow import hardware as yellow_hardware
from homeassistant.components.usb import USBDevice, scan_serial_ports
from homeassistant.config_entries import (
SOURCE_IGNORE,
SOURCE_ZEROCONF,
ConfigEntry,
ConfigEntryBaseFlow,
ConfigEntryState,
ConfigFlow,
ConfigFlowResult,
OperationNotAllowed,
OptionsFlow,
)
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import AbortFlow, progress_step
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.selector import FileSelector, FileSelectorConfig
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from homeassistant.util import dt as dt_util
from .const import CONF_BAUDRATE, CONF_FLOW_CONTROL, CONF_RADIO_TYPE, DOMAIN
from .helpers import get_config_entry_unique_id, get_zha_gateway
from .radio_manager import (
DEVICE_SCHEMA,
HARDWARE_DISCOVERY_SCHEMA,
RECOMMENDED_RADIOS,
ProbeResult,
ZhaRadioManager,
)
CONF_MANUAL_PATH = "Enter Manually"
DECONZ_DOMAIN = "deconz"
# The ZHA config flow takes different branches depending on if you are migrating to a
# new adapter via discovery or setting it up from scratch
# For the fast path, we automatically migrate everything and restore the most recent backup
MIGRATION_STRATEGY_RECOMMENDED = "migration_strategy_recommended"
MIGRATION_STRATEGY_ADVANCED = "migration_strategy_advanced"
# Similarly, setup follows the same approach: we create a new network
SETUP_STRATEGY_RECOMMENDED = "setup_strategy_recommended"
SETUP_STRATEGY_ADVANCED = "setup_strategy_advanced"
# For the advanced paths, we allow users to pick how to form a network: form a brand new
# network, use the settings currently on the stick, restore from a database backup, or
# restore from a JSON backup
FORMATION_STRATEGY = "formation_strategy"
FORMATION_FORM_NEW_NETWORK = "form_new_network"
FORMATION_FORM_INITIAL_NETWORK = "form_initial_network"
FORMATION_REUSE_SETTINGS = "reuse_settings"
FORMATION_CHOOSE_AUTOMATIC_BACKUP = "choose_automatic_backup"
FORMATION_UPLOAD_MANUAL_BACKUP = "upload_manual_backup"
CHOOSE_AUTOMATIC_BACKUP = "choose_automatic_backup"
OVERWRITE_COORDINATOR_IEEE = "overwrite_coordinator_ieee"
UPLOADED_BACKUP_FILE = "uploaded_backup_file"
REPAIR_MY_URL = "https://my.home-assistant.io/redirect/repairs/"
LEGACY_ZEROCONF_PORT = 6638
LEGACY_ZEROCONF_ESPHOME_API_PORT = 6053
ZEROCONF_SERVICE_TYPE = "_zigbee-coordinator._tcp.local."
ZEROCONF_PROPERTIES_SCHEMA = vol.Schema(
{
vol.Required("radio_type"): vol.All(str, vol.In([t.name for t in RadioType])),
vol.Required("serial_number"): str,
},
extra=vol.ALLOW_EXTRA,
)
class OptionsMigrationIntent(StrEnum):
"""Zigbee options flow intents."""
MIGRATE = "intent_migrate"
RECONFIGURE = "intent_reconfigure"
def _format_backup_choice(
backup: zigpy.backups.NetworkBackup, *, pan_ids: bool = True
) -> str:
"""Format network backup info into a short piece of text."""
if not pan_ids:
return dt_util.as_local(backup.backup_time).strftime("%c")
identifier = (
# PAN ID
f"{str(backup.network_info.pan_id)[2:]}"
# EPID
f":{str(backup.network_info.extended_pan_id).replace(':', '')}"
).lower()
return f"{dt_util.as_local(backup.backup_time).strftime('%c')} ({identifier})"
async def list_serial_ports(hass: HomeAssistant) -> list[USBDevice]:
"""List all serial ports, including the Yellow radio and the multi-PAN addon."""
ports: list[USBDevice] = []
ports.extend(await hass.async_add_executor_job(scan_serial_ports))
# Add useful info to the Yellow's serial port selection screen
try:
yellow_hardware.async_info(hass)
except HomeAssistantError:
pass
else:
# PySerial does not properly handle the Yellow's serial port with the CM5
# so we manually include it
port = USBDevice(
device="/dev/ttyAMA1",
vid="ffff", # This is technically not a USB device
pid="ffff",
serial_number=None,
manufacturer="Nabu Casa",
description="Yellow Zigbee module",
)
ports = [p for p in ports if not p.device.startswith("/dev/ttyAMA")]
ports.insert(0, port)
if is_hassio(hass):
# Present the multi-PAN addon as a setup option, if it's available
multipan_manager = (
await silabs_multiprotocol_addon.get_multiprotocol_addon_manager(hass)
)
try:
addon_info = await multipan_manager.async_get_addon_info()
except (AddonError, KeyError):
addon_info = None
if addon_info is not None and addon_info.state != AddonState.NOT_INSTALLED:
addon_port = USBDevice(
device=silabs_multiprotocol_addon.get_zigbee_socket(),
vid="ffff", # This is technically not a USB device
pid="ffff",
serial_number=None,
manufacturer="Nabu Casa",
description="Silicon Labs Multiprotocol add-on",
)
ports.append(addon_port)
return ports
class BaseZhaFlow(ConfigEntryBaseFlow):
"""Mixin for common ZHA flow steps and forms."""
_flow_strategy: ZigbeeFlowStrategy | None = None
_overwrite_ieee_during_restore: bool = False
_hass: HomeAssistant
_title: str
def __init__(self) -> None:
"""Initialize flow instance."""
super().__init__()
self._hass = None # type: ignore[assignment]
self._radio_mgr = ZhaRadioManager()
self._restore_backup_task: asyncio.Task[None] | None = None
self._extra_network_config: dict[str, Any] = {}
@property
def hass(self) -> HomeAssistant:
"""Return hass."""
return self._hass
@hass.setter
def hass(self, hass: HomeAssistant) -> None:
"""Set hass."""
self._hass = hass
self._radio_mgr.hass = hass
def _get_config_entry_data(self) -> dict[str, Any]:
"""Extract ZHA config entry data from the radio manager."""
assert self._radio_mgr.radio_type is not None
assert self._radio_mgr.device_path is not None
assert self._radio_mgr.device_settings is not None
return {
CONF_DEVICE: DEVICE_SCHEMA(
{
**self._radio_mgr.device_settings,
CONF_DEVICE_PATH: self._radio_mgr.device_path,
}
),
CONF_RADIO_TYPE: self._radio_mgr.radio_type.name,
}
@abstractmethod
async def _async_create_radio_entry(self) -> ConfigFlowResult:
"""Create a config entry with the current flow state."""
async def async_step_choose_serial_port(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose a serial port."""
ports = await list_serial_ports(self.hass)
# The full `/dev/serial/by-id/` path is too verbose to show
resolved_paths = {
p.device: await self.hass.async_add_executor_job(os.path.realpath, p.device)
for p in ports
}
list_of_ports = [
f"{resolved_paths[p.device]} - {p.description}{', s/n: ' + p.serial_number if p.serial_number else ''}"
+ (f" - {p.manufacturer}" if p.manufacturer else "")
for p in ports
]
if not list_of_ports:
return await self.async_step_manual_pick_radio_type()
list_of_ports.append(CONF_MANUAL_PATH)
if user_input is not None:
user_selection = user_input[CONF_DEVICE_PATH]
if user_selection == CONF_MANUAL_PATH:
return await self.async_step_manual_pick_radio_type()
port = ports[list_of_ports.index(user_selection)]
self._radio_mgr.device_path = port.device
probe_result = await self._radio_mgr.detect_radio_type()
if probe_result == ProbeResult.WRONG_FIRMWARE_INSTALLED:
return self.async_abort(
reason="wrong_firmware_installed",
description_placeholders={"repair_url": REPAIR_MY_URL},
)
if probe_result == ProbeResult.PROBING_FAILED:
# Did not autodetect anything, proceed to manual selection
return await self.async_step_manual_pick_radio_type()
self._title = (
f"{port.description}{', s/n: ' + port.serial_number if port.serial_number else ''}"
f" - {port.manufacturer}"
if port.manufacturer
else ""
)
return await self.async_step_verify_radio()
# Pre-select the currently configured port
default_port: vol.Undefined | str = vol.UNDEFINED
if self._radio_mgr.device_path is not None:
for description, port in zip(list_of_ports, ports, strict=False):
if port.device == self._radio_mgr.device_path:
default_port = description
break
else:
default_port = CONF_MANUAL_PATH
schema = vol.Schema(
{
vol.Required(CONF_DEVICE_PATH, default=default_port): vol.In(
list_of_ports
)
}
)
return self.async_show_form(step_id="choose_serial_port", data_schema=schema)
async def async_step_manual_pick_radio_type(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manually select the radio type."""
if user_input is not None:
self._radio_mgr.radio_type = RadioType.get_by_description(
user_input[CONF_RADIO_TYPE]
)
return await self.async_step_manual_port_config()
# Pre-select the current radio type
default: vol.Undefined | str = vol.UNDEFINED
if self._radio_mgr.radio_type is not None:
default = self._radio_mgr.radio_type.description
schema = {
vol.Required(CONF_RADIO_TYPE, default=default): vol.In(RadioType.list())
}
return self.async_show_form(
step_id="manual_pick_radio_type",
data_schema=vol.Schema(schema),
)
async def async_step_manual_port_config(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Enter port settings specific for this type of radio."""
assert self._radio_mgr.radio_type is not None
errors = {}
if user_input is not None:
self._title = user_input[CONF_DEVICE_PATH]
self._radio_mgr.device_path = user_input[CONF_DEVICE_PATH]
self._radio_mgr.device_settings = DEVICE_SCHEMA(
{
CONF_DEVICE_PATH: self._radio_mgr.device_path,
CONF_BAUDRATE: user_input[CONF_BAUDRATE],
# `None` shows up as the empty string in the frontend
CONF_FLOW_CONTROL: (
user_input[CONF_FLOW_CONTROL]
if user_input[CONF_FLOW_CONTROL] != "none"
else None
),
}
)
if await self._radio_mgr.radio_type.controller.probe(
self._radio_mgr.device_settings
):
return await self.async_step_verify_radio()
errors["base"] = "cannot_connect"
device_settings = self._radio_mgr.device_settings or {}
return self.async_show_form(
step_id="manual_port_config",
data_schema=vol.Schema(
{
vol.Required(
CONF_DEVICE_PATH,
default=self._radio_mgr.device_path or vol.UNDEFINED,
): str,
vol.Required(
CONF_BAUDRATE,
default=device_settings.get(CONF_BAUDRATE) or 115200,
): int,
vol.Required(
CONF_FLOW_CONTROL,
default=device_settings.get(CONF_FLOW_CONTROL) or "none",
): vol.In(["hardware", "software", "none"]),
}
),
errors=errors,
)
async def async_step_verify_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Add a warning step to dissuade the use of deprecated radios."""
assert self._radio_mgr.radio_type is not None
await self._radio_mgr.async_read_backups_from_database()
# Skip this step if we are using a recommended radio
if user_input is not None or self._radio_mgr.radio_type in RECOMMENDED_RADIOS:
# ZHA disables the single instance check and will decide at runtime if we
# are migrating or setting up from scratch
if self.hass.config_entries.async_entries(DOMAIN, include_ignore=False):
return await self.async_step_choose_migration_strategy()
return await self.async_step_choose_setup_strategy()
return self.async_show_form(
step_id="verify_radio",
description_placeholders={
CONF_NAME: self._radio_mgr.radio_type.description,
"docs_recommended_adapters_url": (
"https://www.home-assistant.io/integrations/zha/#recommended-zigbee-radio-adapters-and-modules"
),
},
)
async def async_step_choose_setup_strategy(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose how to set up the integration from scratch."""
if self._flow_strategy == ZigbeeFlowStrategy.RECOMMENDED:
# Fast path: automatically form a new network
return await self.async_step_setup_strategy_recommended()
if self._flow_strategy == ZigbeeFlowStrategy.ADVANCED:
# Advanced path: let the user choose
return await self.async_step_setup_strategy_advanced()
# Allow onboarding for new users to just create a new network automatically
if (
not onboarding.async_is_onboarded(self.hass)
and not self.hass.config_entries.async_entries(DOMAIN, include_ignore=False)
and not self._radio_mgr.backups
):
return await self.async_step_setup_strategy_recommended()
return self.async_show_menu(
step_id="choose_setup_strategy",
menu_options=[
SETUP_STRATEGY_RECOMMENDED,
SETUP_STRATEGY_ADVANCED,
],
)
async def async_step_setup_strategy_recommended(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Recommended setup strategy: form a brand-new network."""
return await self.async_step_form_new_network()
async def async_step_setup_strategy_advanced(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Advanced setup strategy: let the user choose."""
return await self.async_step_choose_formation_strategy()
async def async_step_choose_migration_strategy(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose how to deal with the current radio's settings during migration."""
if self._flow_strategy == ZigbeeFlowStrategy.RECOMMENDED:
# Fast path: automatically migrate everything
return await self.async_step_migration_strategy_recommended()
if self._flow_strategy == ZigbeeFlowStrategy.ADVANCED:
# Advanced path: let the user choose
return await self.async_step_migration_strategy_advanced()
return self.async_show_menu(
step_id="choose_migration_strategy",
menu_options=[
MIGRATION_STRATEGY_RECOMMENDED,
MIGRATION_STRATEGY_ADVANCED,
],
)
async def async_step_migration_strategy_recommended(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Recommended migration strategy: automatically migrate everything."""
# Assume the most recent backup is the correct one
self._radio_mgr.chosen_backup = self._radio_mgr.backups[0]
return await self.async_step_maybe_reset_old_radio()
@progress_step()
async def async_step_maybe_reset_old_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Erase the old radio's network settings before migration."""
# Like in the options flow, pull the correct settings from the config entry
config_entries = self.hass.config_entries.async_entries(
DOMAIN, include_ignore=False
)
if config_entries:
assert len(config_entries) == 1
config_entry = config_entries[0]
# Unload ZHA before connecting to the old adapter
with suppress(OperationNotAllowed):
await self.hass.config_entries.async_unload(config_entry.entry_id)
# Create a radio manager to connect to the old stick to reset it
temp_radio_mgr = ZhaRadioManager()
temp_radio_mgr.hass = self.hass
temp_radio_mgr.device_path = config_entry.data[CONF_DEVICE][
CONF_DEVICE_PATH
]
temp_radio_mgr.device_settings = config_entry.data[CONF_DEVICE]
temp_radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]]
try:
await temp_radio_mgr.async_reset_adapter()
except HomeAssistantError:
# Old adapter not found or cannot connect, show prompt to plug back in
return await self.async_step_plug_in_old_radio()
return await self.async_step_restore_backup()
async def async_step_plug_in_old_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prompt user to plug in the old radio if connection fails."""
config_entries = self.hass.config_entries.async_entries(
DOMAIN, include_ignore=False
)
# Unless the user removes the config entry whilst we try to reset the old radio
# for a few seconds and then also unplugs it, we will basically never hit this
if not config_entries:
return await self.async_step_restore_backup()
config_entry = config_entries[0]
old_device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH]
return self.async_show_menu(
step_id="plug_in_old_radio",
menu_options=["retry_old_radio", "skip_reset_old_radio"],
description_placeholders={"device_path": old_device_path},
)
async def async_step_retry_old_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Retry connecting to the old radio."""
return await self.async_step_maybe_reset_old_radio()
async def async_step_skip_reset_old_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Skip resetting the old radio and continue with migration."""
return await self.async_step_restore_backup()
async def async_step_pre_plug_in_new_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Strip user_input before showing "plug in new radio" form."""
# This step is necessary to prevent `user_input` from being passed through
return await self.async_step_plug_in_new_radio()
async def async_step_plug_in_new_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prompt user to plug in the new radio if connection fails."""
if user_input is not None:
# User confirmed, retry now
return await self.async_step_restore_backup()
assert self._radio_mgr.device_path is not None
return self.async_show_form(
step_id="plug_in_new_radio",
description_placeholders={"device_path": self._radio_mgr.device_path},
)
async def async_step_migration_strategy_advanced(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Advanced migration strategy: let the user choose."""
return await self.async_step_choose_formation_strategy()
async def async_step_choose_formation_strategy(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose how to deal with the current radio's settings."""
await self._radio_mgr.async_load_network_settings()
strategies = []
# Check if we have any automatic backups *and* if the backups differ from
# the current radio settings, if they exist (since restoring would be redundant)
if self._radio_mgr.backups and (
self._radio_mgr.current_settings is None
or any(
not backup.is_compatible_with(self._radio_mgr.current_settings)
for backup in self._radio_mgr.backups
)
):
strategies.append(CHOOSE_AUTOMATIC_BACKUP)
if self._radio_mgr.current_settings is not None:
strategies.append(FORMATION_REUSE_SETTINGS)
strategies.append(FORMATION_UPLOAD_MANUAL_BACKUP)
# Do not show "erase network settings" if there are none to erase
if self._radio_mgr.current_settings is None:
strategies.append(FORMATION_FORM_INITIAL_NETWORK)
else:
strategies.append(FORMATION_FORM_NEW_NETWORK)
# Automatically form a new network if we're onboarding with a brand new radio
if not onboarding.async_is_onboarded(self.hass) and set(strategies) == {
FORMATION_UPLOAD_MANUAL_BACKUP,
FORMATION_FORM_INITIAL_NETWORK,
}:
return await self.async_step_form_initial_network()
# Otherwise, let the user choose
return self.async_show_menu(
step_id="choose_formation_strategy",
menu_options=strategies,
)
async def async_step_reuse_settings(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Reuse the existing network settings on the stick."""
return await self._async_create_radio_entry()
async def async_step_form_initial_network(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Form an initial network."""
# This step exists only for translations, it does nothing new
return await self.async_step_form_new_network(user_input)
@progress_step()
async def async_step_form_new_network(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Form a brand-new network."""
await self._radio_mgr.async_form_network(config=self._extra_network_config)
# Load the newly formed network settings to get the network info
await self._radio_mgr.async_load_network_settings()
return await self._async_create_radio_entry()
def _parse_uploaded_backup(
self, uploaded_file_id: str
) -> zigpy.backups.NetworkBackup:
"""Read and parse an uploaded backup JSON file."""
with process_uploaded_file(self.hass, uploaded_file_id) as file_path:
contents = file_path.read_text()
return zigpy.backups.NetworkBackup.from_dict(json.loads(contents))
async def async_step_upload_manual_backup(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Upload and restore a coordinator backup JSON file."""
errors = {}
if user_input is not None:
try:
self._radio_mgr.chosen_backup = await self.hass.async_add_executor_job(
self._parse_uploaded_backup, user_input[UPLOADED_BACKUP_FILE]
)
except ValueError:
errors["base"] = "invalid_backup_json"
else:
return await self.async_step_maybe_reset_old_radio()
return self.async_show_form(
step_id="upload_manual_backup",
data_schema=vol.Schema(
{
vol.Required(UPLOADED_BACKUP_FILE): FileSelector(
FileSelectorConfig(accept=".json,application/json")
)
}
),
errors=errors,
)
async def async_step_choose_automatic_backup(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose an automatic backup."""
if self.show_advanced_options:
# Always show the PAN IDs when in advanced mode
choices = [
_format_backup_choice(backup, pan_ids=True)
for backup in self._radio_mgr.backups
]
else:
# Only show the PAN IDs for multiple backups taken on the same day
num_backups_on_date = collections.Counter(
backup.backup_time.date() for backup in self._radio_mgr.backups
)
choices = [
_format_backup_choice(
backup, pan_ids=(num_backups_on_date[backup.backup_time.date()] > 1)
)
for backup in self._radio_mgr.backups
]
if user_input is not None:
index = choices.index(user_input[CHOOSE_AUTOMATIC_BACKUP])
self._radio_mgr.chosen_backup = self._radio_mgr.backups[index]
return await self.async_step_maybe_reset_old_radio()
return self.async_show_form(
step_id="choose_automatic_backup",
data_schema=vol.Schema(
{
vol.Required(CHOOSE_AUTOMATIC_BACKUP, default=choices[0]): vol.In(
choices
),
}
),
)
async def async_step_restore_backup(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Restore network backup to new radio."""
if self._restore_backup_task is None:
self._restore_backup_task = self.hass.async_create_task(
self._radio_mgr.restore_backup(
overwrite_ieee=self._overwrite_ieee_during_restore
),
"Restore backup",
)
if not self._restore_backup_task.done():
return self.async_show_progress(
step_id="restore_backup",
progress_action="restore_backup",
progress_task=self._restore_backup_task,
)
try:
await self._restore_backup_task
except DestructiveWriteNetworkSettings:
# If we cannot restore without overwriting the IEEE, ask for confirmation
return self.async_show_progress_done(
next_step_id="pre_confirm_ezsp_ieee_overwrite"
)
except HomeAssistantError:
# User unplugged the new adapter, allow retry
return self.async_show_progress_done(next_step_id="pre_plug_in_new_radio")
except CannotWriteNetworkSettings as exc:
return self.async_abort(
reason="cannot_restore_backup",
description_placeholders={"error": str(exc)},
)
finally:
self._restore_backup_task = None
# Otherwise, proceed to entry creation
return self.async_show_progress_done(next_step_id="create_entry")
async def async_step_pre_confirm_ezsp_ieee_overwrite(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Strip user_input before showing confirmation form."""
# This step is necessary to prevent `user_input` from being passed through
return await self.async_step_confirm_ezsp_ieee_overwrite()
async def async_step_confirm_ezsp_ieee_overwrite(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Show confirmation form for EZSP IEEE address overwrite."""
if user_input is None:
return self.async_show_form(
step_id="confirm_ezsp_ieee_overwrite",
data_schema=vol.Schema(
{vol.Required(OVERWRITE_COORDINATOR_IEEE, default=True): bool}
),
)
if not user_input[OVERWRITE_COORDINATOR_IEEE]:
return self.async_abort(reason="cannot_restore_backup_no_ieee_confirm")
self._overwrite_ieee_during_restore = True
return await self.async_step_restore_backup()
async def async_step_create_entry(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Create the config entry after successful setup/migration."""
# This step only exists so that we can create entries from other steps
return await self._async_create_radio_entry()
class ZhaConfigFlowHandler(BaseZhaFlow, ConfigFlow, domain=DOMAIN):
"""Handle a config flow."""
VERSION = 5
async def _set_unique_id_and_update_ignored_flow(
self, unique_id: str, device_path: str
) -> None:
"""Set the flow's unique ID and update the device path in an ignored flow."""
current_entry = await self.async_set_unique_id(unique_id)
# Only update the current entry if it is an ignored discovery
if current_entry and current_entry.source == SOURCE_IGNORE:
self._abort_if_unique_id_configured(
updates={
CONF_DEVICE: {
**current_entry.data.get(CONF_DEVICE, {}),
CONF_DEVICE_PATH: device_path,
},
}
)
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry,
) -> OptionsFlow:
"""Create the options flow."""
return ZhaOptionsFlowHandler(config_entry)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a ZHA config flow start."""
return await self.async_step_choose_serial_port(user_input)
async def async_step_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm a discovery."""
self._set_confirm_only()
zha_config_entries = self.hass.config_entries.async_entries(
DOMAIN, include_ignore=False
)
if self._radio_mgr.device_path is not None:
# Ensure the radio manager device path is unique and will match ZHA's
try:
self._radio_mgr.device_path = await self.hass.async_add_executor_job(
usb.get_serial_by_id, self._radio_mgr.device_path
)
except OSError as error:
raise AbortFlow(
reason="cannot_resolve_path",
description_placeholders={"path": self._radio_mgr.device_path},
) from error
# mDNS discovery can advertise the same adapter on multiple IPs or via a
# hostname, which should be considered a duplicate
current_device_paths = {self._radio_mgr.device_path}
if self.source == SOURCE_ZEROCONF:
discovery_info = self.init_data
current_device_paths |= {
f"socket://{ip}:{discovery_info.port}"
for ip in discovery_info.ip_addresses
}
for entry in zha_config_entries:
path = entry.data.get(CONF_DEVICE, {}).get(CONF_DEVICE_PATH)
# Abort discovery if the device path is already configured
if path is not None and path in current_device_paths:
return self.async_abort(reason="single_instance_allowed")
# Without confirmation, discovery can automatically progress into parts of the
# config flow logic that interacts with hardware.
# Ignore Zeroconf discoveries during onboarding, as they may be in use already.
if user_input is not None or (
not onboarding.async_is_onboarded(self.hass)
and not zha_config_entries
and self.source != SOURCE_ZEROCONF
):
# Probe the radio type if we don't have one yet
if self._radio_mgr.radio_type is None:
probe_result = await self._radio_mgr.detect_radio_type()
else:
probe_result = ProbeResult.RADIO_TYPE_DETECTED
if probe_result == ProbeResult.WRONG_FIRMWARE_INSTALLED:
return self.async_abort(
reason="wrong_firmware_installed",
description_placeholders={"repair_url": REPAIR_MY_URL},
)
if probe_result == ProbeResult.PROBING_FAILED:
# This path probably will not happen now that we have
# more precise USB matching unless there is a problem
# with the device
return self.async_abort(reason="usb_probe_failed")
if self._radio_mgr.device_settings is None:
return await self.async_step_manual_port_config()
return await self.async_step_verify_radio()
return self.async_show_form(
step_id="confirm",
description_placeholders={CONF_NAME: self._title},
)
async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult:
"""Handle usb discovery."""
vid = discovery_info.vid
pid = discovery_info.pid
serial_number = discovery_info.serial_number
manufacturer = discovery_info.manufacturer
description = discovery_info.description
dev_path = discovery_info.device
await self._set_unique_id_and_update_ignored_flow(
unique_id=f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}",
device_path=dev_path,
)
# If they already have a discovery for deconz we ignore the usb discovery as
# they probably want to use it there instead
if self.hass.config_entries.flow.async_progress_by_handler(DECONZ_DOMAIN):
return self.async_abort(reason="not_zha_device")
for entry in self.hass.config_entries.async_entries(DECONZ_DOMAIN):
if entry.source != SOURCE_IGNORE:
return self.async_abort(reason="not_zha_device")
self._radio_mgr.device_path = dev_path
self._title = description or usb.human_readable_device_name(
dev_path,
serial_number,
manufacturer,
description,
vid,
pid,
)
self.context["title_placeholders"] = {CONF_NAME: self._title}
return await self.async_step_confirm()
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle zeroconf discovery."""
# Transform legacy zeroconf discovery into the new format
if discovery_info.type != ZEROCONF_SERVICE_TYPE:
port = discovery_info.port or LEGACY_ZEROCONF_PORT
name = discovery_info.name
# Fix incorrect port for older TubesZB devices
if "tube" in name and port == LEGACY_ZEROCONF_ESPHOME_API_PORT:
port = LEGACY_ZEROCONF_PORT
# Determine the radio type
if "radio_type" in discovery_info.properties:
radio_type = discovery_info.properties["radio_type"]
elif "efr32" in name:
radio_type = RadioType.ezsp.name
elif "zigate" in name:
radio_type = RadioType.zigate.name
else:
radio_type = RadioType.znp.name
fallback_title = name.split("._", 1)[0]
title = discovery_info.properties.get("name", fallback_title)
discovery_info = ZeroconfServiceInfo(
ip_address=discovery_info.ip_address,
ip_addresses=discovery_info.ip_addresses,
port=port,
hostname=discovery_info.hostname,
type=ZEROCONF_SERVICE_TYPE,
name=f"{title}.{ZEROCONF_SERVICE_TYPE}",
properties={
"radio_type": radio_type,
# To maintain backwards compatibility
"serial_number": discovery_info.hostname.removesuffix(".local."),
},
)
try:
discovery_props = ZEROCONF_PROPERTIES_SCHEMA(discovery_info.properties)
except vol.Invalid:
return self.async_abort(reason="invalid_zeroconf_data")
radio_type = self._radio_mgr.parse_radio_type(discovery_props["radio_type"])
device_path = f"socket://{discovery_info.host}:{discovery_info.port}"
title = discovery_info.name.removesuffix(f".{ZEROCONF_SERVICE_TYPE}")
await self._set_unique_id_and_update_ignored_flow(
unique_id=discovery_props["serial_number"],
device_path=device_path,
)
self.context["title_placeholders"] = {CONF_NAME: title}
self._title = title
self._radio_mgr.device_path = device_path
self._radio_mgr.radio_type = radio_type
self._radio_mgr.device_settings = DEVICE_SCHEMA(
{
CONF_DEVICE_PATH: device_path,
CONF_BAUDRATE: 115200,
CONF_FLOW_CONTROL: None,
}
)
return await self.async_step_confirm()
async def async_step_hardware(
self, data: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle hardware flow."""
try:
discovery_data = HARDWARE_DISCOVERY_SCHEMA(data)
except vol.Invalid:
return self.async_abort(reason="invalid_hardware_data")
name = discovery_data["name"]
radio_type = self._radio_mgr.parse_radio_type(discovery_data["radio_type"])
device_settings = discovery_data["port"]
device_path = device_settings[CONF_DEVICE_PATH]
self._flow_strategy = discovery_data.get("flow_strategy")
if "tx_power" in discovery_data:
self._extra_network_config[CONF_NWK_TX_POWER] = discovery_data["tx_power"]
await self._set_unique_id_and_update_ignored_flow(
unique_id=f"{name}_{radio_type.name}_{device_path}",
device_path=device_path,
)
self._title = name
self._radio_mgr.radio_type = radio_type
self._radio_mgr.device_path = device_path
self._radio_mgr.device_settings = device_settings
self.context["title_placeholders"] = {CONF_NAME: name}
return await self.async_step_confirm()
async def _async_create_radio_entry(self) -> ConfigFlowResult:
"""Create a config entry with the current flow state."""
# ZHA is still single instance only, even though we use discovery to allow for
# migrating to a new radio
zha_config_entries = self.hass.config_entries.async_entries(
DOMAIN, include_ignore=False
)
data = self._get_config_entry_data()
if len(zha_config_entries) == 1:
return self.async_update_reload_and_abort(
entry=zha_config_entries[0],
title=self._title,
data=data,
reload_even_if_entry_is_unchanged=True,
reason="reconfigure_successful",
)
if not zha_config_entries:
# Load network settings from the radio to get the EPID
await self._radio_mgr.async_load_network_settings()
assert self._radio_mgr.current_settings is not None
unique_id = get_config_entry_unique_id(
self._radio_mgr.current_settings.network_info
)
await self.async_set_unique_id(unique_id)
return self.async_create_entry(
title=self._title,
data=data,
)
# This should never be reached
return self.async_abort(reason="single_instance_allowed")
class ZhaOptionsFlowHandler(BaseZhaFlow, OptionsFlow):
"""Handle an options flow."""
_migration_intent: OptionsMigrationIntent
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize options flow."""
super().__init__()
self._radio_mgr.device_path = config_entry.data[CONF_DEVICE][CONF_DEVICE_PATH]
self._radio_mgr.device_settings = config_entry.data[CONF_DEVICE]
self._radio_mgr.radio_type = RadioType[config_entry.data[CONF_RADIO_TYPE]]
self._title = config_entry.title
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Launch the options flow."""
if user_input is not None:
# Perform a backup first
try:
zha_gateway = get_zha_gateway(self.hass)
except ValueError:
pass
else:
# The backup itself will be stored in `zigbee.db`, which the radio
# manager will read when the class is initialized
application_controller = zha_gateway.application_controller
await application_controller.backups.create_backup(load_devices=True)
# Then unload the integration
with suppress(OperationNotAllowed):
# OperationNotAllowed: ZHA is not running
await self.hass.config_entries.async_unload(self.config_entry.entry_id)
return await self.async_step_prompt_migrate_or_reconfigure()
return self.async_show_form(step_id="init")
async def async_step_prompt_migrate_or_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm if we are migrating adapters or just re-configuring."""
return self.async_show_menu(
step_id="prompt_migrate_or_reconfigure",
menu_options=[
OptionsMigrationIntent.RECONFIGURE,
OptionsMigrationIntent.MIGRATE,
],
)
async def async_step_intent_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Virtual step for when the user is reconfiguring the integration."""
self._migration_intent = OptionsMigrationIntent.RECONFIGURE
return await self.async_step_choose_serial_port()
async def async_step_intent_migrate(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm the user wants to reset their current radio."""
self._migration_intent = OptionsMigrationIntent.MIGRATE
return await self.async_step_choose_serial_port()
async def async_step_maybe_reset_old_radio(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Erase the old radio's network settings before migration."""
# If we are reconfiguring, the old radio will not be available
if self._migration_intent is OptionsMigrationIntent.RECONFIGURE:
return await self.async_step_restore_backup()
return await super().async_step_maybe_reset_old_radio(user_input)
async def _async_create_radio_entry(self):
"""Re-implementation of the base flow's final step to update the config."""
# Avoid creating both `.options` and `.data` by directly writing `data` here
self.hass.config_entries.async_update_entry(
entry=self.config_entry,
data=self._get_config_entry_data(),
options=self.config_entry.options,
)
# Reload ZHA after we finish
await self.hass.config_entries.async_setup(self.config_entry.entry_id)
return self.async_abort(reason="reconfigure_successful")
def async_remove(self):
"""Maybe reload ZHA if the flow is aborted."""
if self.config_entry.state not in (
ConfigEntryState.SETUP_ERROR,
ConfigEntryState.NOT_LOADED,
):
return
self.hass.async_create_task(
self.hass.config_entries.async_setup(self.config_entry.entry_id)
)