"""Config flow for Z-Wave JS integration.""" from __future__ import annotations import asyncio from contextlib import suppress from datetime import datetime import logging from pathlib import Path from typing import Any import aiohttp from awesomeversion import AwesomeVersion from serial.tools import list_ports import voluptuous as vol from zwave_js_server.client import Client from zwave_js_server.exceptions import FailedCommand from zwave_js_server.model.driver import Driver from zwave_js_server.version import VersionInfo, get_server_version from homeassistant.components import usb from homeassistant.components.hassio import ( AddonError, AddonInfo, AddonManager, AddonState, ) from homeassistant.config_entries import ( SOURCE_USB, ConfigEntry, ConfigEntryState, ConfigFlow, ConfigFlowResult, ) from homeassistant.const import CONF_NAME, CONF_URL from homeassistant.core import HomeAssistant, callback from homeassistant.data_entry_flow import AbortFlow from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.hassio import is_hassio from homeassistant.helpers.service_info.hassio import HassioServiceInfo from homeassistant.helpers.service_info.usb import UsbServiceInfo from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo from homeassistant.helpers.typing import VolDictType from .addon import get_addon_manager from .const import ( ADDON_SLUG, CONF_ADDON_DEVICE, CONF_ADDON_EMULATE_HARDWARE, CONF_ADDON_LOG_LEVEL, CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, CONF_ADDON_LR_S2_AUTHENTICATED_KEY, CONF_ADDON_NETWORK_KEY, CONF_ADDON_S0_LEGACY_KEY, CONF_ADDON_S2_ACCESS_CONTROL_KEY, CONF_ADDON_S2_AUTHENTICATED_KEY, CONF_ADDON_S2_UNAUTHENTICATED_KEY, CONF_INTEGRATION_CREATED_ADDON, CONF_LR_S2_ACCESS_CONTROL_KEY, CONF_LR_S2_AUTHENTICATED_KEY, CONF_S0_LEGACY_KEY, CONF_S2_ACCESS_CONTROL_KEY, CONF_S2_AUTHENTICATED_KEY, CONF_S2_UNAUTHENTICATED_KEY, CONF_USB_PATH, CONF_USE_ADDON, DATA_CLIENT, DOMAIN, RESTORE_NVM_DRIVER_READY_TIMEOUT, ) _LOGGER = logging.getLogger(__name__) DEFAULT_URL = "ws://localhost:3000" TITLE = "Z-Wave JS" ADDON_SETUP_TIMEOUT = 5 ADDON_SETUP_TIMEOUT_ROUNDS = 40 CONF_EMULATE_HARDWARE = "emulate_hardware" CONF_LOG_LEVEL = "log_level" SERVER_VERSION_TIMEOUT = 10 ADDON_LOG_LEVELS = { "error": "Error", "warn": "Warn", "info": "Info", "verbose": "Verbose", "debug": "Debug", "silly": "Silly", } ADDON_USER_INPUT_MAP = { CONF_ADDON_DEVICE: CONF_USB_PATH, CONF_ADDON_S0_LEGACY_KEY: CONF_S0_LEGACY_KEY, CONF_ADDON_S2_ACCESS_CONTROL_KEY: CONF_S2_ACCESS_CONTROL_KEY, CONF_ADDON_S2_AUTHENTICATED_KEY: CONF_S2_AUTHENTICATED_KEY, CONF_ADDON_S2_UNAUTHENTICATED_KEY: CONF_S2_UNAUTHENTICATED_KEY, CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY: CONF_LR_S2_ACCESS_CONTROL_KEY, CONF_ADDON_LR_S2_AUTHENTICATED_KEY: CONF_LR_S2_AUTHENTICATED_KEY, CONF_ADDON_LOG_LEVEL: CONF_LOG_LEVEL, CONF_ADDON_EMULATE_HARDWARE: CONF_EMULATE_HARDWARE, } ON_SUPERVISOR_SCHEMA = vol.Schema({vol.Optional(CONF_USE_ADDON, default=True): bool}) MIN_MIGRATION_SDK_VERSION = AwesomeVersion("6.61") def get_manual_schema(user_input: dict[str, Any]) -> vol.Schema: """Return a schema for the manual step.""" default_url = user_input.get(CONF_URL, DEFAULT_URL) return vol.Schema({vol.Required(CONF_URL, default=default_url): str}) def get_on_supervisor_schema(user_input: dict[str, Any]) -> vol.Schema: """Return a schema for the on Supervisor step.""" default_use_addon = user_input[CONF_USE_ADDON] return vol.Schema({vol.Optional(CONF_USE_ADDON, default=default_use_addon): bool}) async def validate_input(hass: HomeAssistant, user_input: dict) -> VersionInfo: """Validate if the user input allows us to connect.""" ws_address = user_input[CONF_URL] if not ws_address.startswith(("ws://", "wss://")): raise InvalidInput("invalid_ws_url") try: return await async_get_version_info(hass, ws_address) except CannotConnect as err: raise InvalidInput("cannot_connect") from err async def async_get_version_info(hass: HomeAssistant, ws_address: str) -> VersionInfo: """Return Z-Wave JS version info.""" try: async with asyncio.timeout(SERVER_VERSION_TIMEOUT): version_info: VersionInfo = await get_server_version( ws_address, async_get_clientsession(hass) ) except (TimeoutError, aiohttp.ClientError) as err: # We don't want to spam the log if the add-on isn't started # or takes a long time to start. _LOGGER.debug("Failed to connect to Z-Wave JS server: %s", err) raise CannotConnect from err return version_info def get_usb_ports() -> dict[str, str]: """Return a dict of USB ports and their friendly names.""" ports = list_ports.comports() port_descriptions = {} for port in ports: vid: str | None = None pid: str | None = None if port.vid is not None and port.pid is not None: usb_device = usb.usb_device_from_port(port) vid = usb_device.vid pid = usb_device.pid dev_path = usb.get_serial_by_id(port.device) human_name = usb.human_readable_device_name( dev_path, port.serial_number, port.manufacturer, port.description, vid, pid, ) port_descriptions[dev_path] = human_name return port_descriptions async def async_get_usb_ports(hass: HomeAssistant) -> dict[str, str]: """Return a dict of USB ports and their friendly names.""" return await hass.async_add_executor_job(get_usb_ports) class ZWaveJSConfigFlow(ConfigFlow, domain=DOMAIN): """Handle a config flow for Z-Wave JS.""" VERSION = 1 _title: str def __init__(self) -> None: """Set up flow instance.""" self.s0_legacy_key: str | None = None self.s2_access_control_key: str | None = None self.s2_authenticated_key: str | None = None self.s2_unauthenticated_key: str | None = None self.lr_s2_access_control_key: str | None = None self.lr_s2_authenticated_key: str | None = None self.usb_path: str | None = None self.ws_address: str | None = None self.restart_addon: bool = False # If we install the add-on we should uninstall it on entry remove. self.integration_created_addon = False self.install_task: asyncio.Task | None = None self.start_task: asyncio.Task | None = None self.version_info: VersionInfo | None = None self.original_addon_config: dict[str, Any] | None = None self.revert_reason: str | None = None self.backup_task: asyncio.Task | None = None self.restore_backup_task: asyncio.Task | None = None self.backup_data: bytes | None = None self.backup_filepath: str | None = None self.use_addon = False self._migrating = False self._reconfigure_config_entry: ConfigEntry | None = None self._usb_discovery = False async def async_step_install_addon( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Install Z-Wave JS add-on.""" if not self.install_task: self.install_task = self.hass.async_create_task(self._async_install_addon()) if not self.install_task.done(): return self.async_show_progress( step_id="install_addon", progress_action="install_addon", progress_task=self.install_task, ) try: await self.install_task except AddonError as err: _LOGGER.error(err) return self.async_show_progress_done(next_step_id="install_failed") finally: self.install_task = None self.integration_created_addon = True return self.async_show_progress_done(next_step_id="configure_addon") async def async_step_install_failed( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Add-on installation failed.""" return self.async_abort(reason="addon_install_failed") async def async_step_start_addon( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Start Z-Wave JS add-on.""" if not self.start_task: self.start_task = self.hass.async_create_task(self._async_start_addon()) if not self.start_task.done(): return self.async_show_progress( step_id="start_addon", progress_action="start_addon", progress_task=self.start_task, ) try: await self.start_task except (CannotConnect, AddonError, AbortFlow) as err: _LOGGER.error(err) return self.async_show_progress_done(next_step_id="start_failed") finally: self.start_task = None return self.async_show_progress_done(next_step_id="finish_addon_setup") async def async_step_start_failed( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Add-on start failed.""" if self._migrating: return self.async_abort(reason="addon_start_failed") if self._reconfigure_config_entry: return await self.async_revert_addon_config(reason="addon_start_failed") return self.async_abort(reason="addon_start_failed") async def _async_start_addon(self) -> None: """Start the Z-Wave JS add-on.""" addon_manager: AddonManager = get_addon_manager(self.hass) self.version_info = None if self.restart_addon: await addon_manager.async_schedule_restart_addon() else: await addon_manager.async_schedule_start_addon() # Sleep some seconds to let the add-on start properly before connecting. for _ in range(ADDON_SETUP_TIMEOUT_ROUNDS): await asyncio.sleep(ADDON_SETUP_TIMEOUT) try: if not self.ws_address: discovery_info = await self._async_get_addon_discovery_info() self.ws_address = ( f"ws://{discovery_info['host']}:{discovery_info['port']}" ) self.version_info = await async_get_version_info( self.hass, self.ws_address ) except (AbortFlow, CannotConnect) as err: _LOGGER.debug( "Add-on not ready yet, waiting %s seconds: %s", ADDON_SETUP_TIMEOUT, err, ) else: break else: raise CannotConnect("Failed to start Z-Wave JS add-on: timeout") async def async_step_configure_addon( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Ask for config for Z-Wave JS add-on.""" if self._reconfigure_config_entry: return await self.async_step_configure_addon_reconfigure(user_input) return await self.async_step_configure_addon_user(user_input) async def async_step_finish_addon_setup( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Prepare info needed to complete the config entry. Get add-on discovery info and server version info. Set unique id and abort if already configured. """ if self._migrating: return await self.async_step_finish_addon_setup_migrate(user_input) if self._reconfigure_config_entry: return await self.async_step_finish_addon_setup_reconfigure(user_input) return await self.async_step_finish_addon_setup_user(user_input) async def _async_get_addon_info(self) -> AddonInfo: """Return and cache Z-Wave JS add-on info.""" addon_manager: AddonManager = get_addon_manager(self.hass) try: addon_info: AddonInfo = await addon_manager.async_get_addon_info() except AddonError as err: _LOGGER.error(err) raise AbortFlow("addon_info_failed") from err return addon_info async def _async_set_addon_config(self, config_updates: dict) -> None: """Set Z-Wave JS add-on config.""" addon_info = await self._async_get_addon_info() addon_config = addon_info.options new_addon_config = addon_config | config_updates if new_addon_config == addon_config: return if addon_info.state == AddonState.RUNNING: self.restart_addon = True # Copy the add-on config to keep the objects separate. self.original_addon_config = dict(addon_config) # Remove legacy network_key new_addon_config.pop(CONF_ADDON_NETWORK_KEY, None) addon_manager: AddonManager = get_addon_manager(self.hass) try: await addon_manager.async_set_addon_options(new_addon_config) except AddonError as err: _LOGGER.error(err) raise AbortFlow("addon_set_config_failed") from err async def _async_install_addon(self) -> None: """Install the Z-Wave JS add-on.""" addon_manager: AddonManager = get_addon_manager(self.hass) await addon_manager.async_schedule_install_addon() async def _async_get_addon_discovery_info(self) -> dict: """Return add-on discovery info.""" addon_manager: AddonManager = get_addon_manager(self.hass) try: discovery_info_config = await addon_manager.async_get_addon_discovery_info() except AddonError as err: _LOGGER.error(err) raise AbortFlow("addon_get_discovery_info_failed") from err return discovery_info_config async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle the initial step.""" if is_hassio(self.hass): return await self.async_step_on_supervisor() return await self.async_step_manual() async def async_step_reconfigure( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Confirm if we are migrating adapters or just re-configuring.""" self._reconfigure_config_entry = self._get_reconfigure_entry() return self.async_show_menu( step_id="reconfigure", menu_options=[ "intent_reconfigure", "intent_migrate", ], ) async def async_step_zeroconf( self, discovery_info: ZeroconfServiceInfo ) -> ConfigFlowResult: """Handle zeroconf discovery.""" home_id = str(discovery_info.properties["homeId"]) await self.async_set_unique_id(home_id) self._abort_if_unique_id_configured() self.ws_address = f"ws://{discovery_info.host}:{discovery_info.port}" self.context.update({"title_placeholders": {CONF_NAME: home_id}}) return await self.async_step_zeroconf_confirm() async def async_step_zeroconf_confirm( self, user_input: dict | None = None ) -> ConfigFlowResult: """Confirm the setup.""" if user_input is not None: return await self.async_step_manual({CONF_URL: self.ws_address}) assert self.ws_address assert self.unique_id return self.async_show_form( step_id="zeroconf_confirm", description_placeholders={ "home_id": self.unique_id, CONF_URL: self.ws_address[5:], }, ) async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult: """Handle USB Discovery.""" if not is_hassio(self.hass): return self.async_abort(reason="discovery_requires_supervisor") if any( flow for flow in self._async_in_progress() if flow["context"].get("source") != SOURCE_USB ): # Allow multiple USB discovery flows to be in progress. # Migration requires more than one USB stick to be connected, # which can cause more than one discovery flow to be in progress, # at least for a short time. return self.async_abort(reason="already_in_progress") if current_config_entries := self._async_current_entries(include_ignore=False): config_entry = next( ( entry for entry in current_config_entries if entry.data.get(CONF_USE_ADDON) ), None, ) if not config_entry: return self.async_abort(reason="addon_required") vid = discovery_info.vid pid = discovery_info.pid serial_number = discovery_info.serial_number manufacturer = discovery_info.manufacturer description = discovery_info.description # Zooz uses this vid/pid, but so do 2652 sticks if vid == "10C4" and pid == "EA60" and description and "2652" in description: return self.async_abort(reason="not_zwave_device") discovery_info.device = await self.hass.async_add_executor_job( usb.get_serial_by_id, discovery_info.device ) addon_info = await self._async_get_addon_info() if ( addon_info.state not in (AddonState.NOT_INSTALLED, AddonState.INSTALLING) and (addon_device := addon_info.options.get(CONF_ADDON_DEVICE)) is not None and await self.hass.async_add_executor_job( usb.get_serial_by_id, addon_device ) == discovery_info.device ): return self.async_abort(reason="already_configured") await self.async_set_unique_id( f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}" ) # We don't need to check if the unique_id is already configured # since we will update the unique_id before finishing the flow. # The unique_id set above is just a temporary value to avoid # duplicate discovery flows. dev_path = discovery_info.device self.usb_path = dev_path if manufacturer == "Nabu Casa" and description == "ZWA-2 - Nabu Casa ZWA-2": title = "Home Assistant Connect ZWA-2" else: human_name = usb.human_readable_device_name( dev_path, serial_number, manufacturer, description, vid, pid, ) title = human_name.split(" - ")[0].strip() self.context["title_placeholders"] = {CONF_NAME: title} self._title = title return await self.async_step_usb_confirm() async def async_step_usb_confirm( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle USB Discovery confirmation.""" if user_input is None: return self.async_show_form( step_id="usb_confirm", description_placeholders={CONF_NAME: self._title}, ) self._usb_discovery = True if current_config_entries := self._async_current_entries(include_ignore=False): self._reconfigure_config_entry = next( ( entry for entry in current_config_entries if entry.data.get(CONF_USE_ADDON) ), None, ) if not self._reconfigure_config_entry: return self.async_abort(reason="addon_required") return await self.async_step_intent_migrate() return await self.async_step_on_supervisor({CONF_USE_ADDON: True}) async def async_step_manual( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle a manual configuration.""" if user_input is None: return self.async_show_form( step_id="manual", data_schema=get_manual_schema({}) ) errors = {} try: version_info = await validate_input(self.hass, user_input) except InvalidInput as err: errors["base"] = err.error except Exception: _LOGGER.exception("Unexpected exception") errors["base"] = "unknown" else: await self.async_set_unique_id( str(version_info.home_id), raise_on_progress=False ) # Make sure we disable any add-on handling # if the controller is reconfigured in a manual step. self._abort_if_unique_id_configured( updates={ **user_input, CONF_USE_ADDON: False, CONF_INTEGRATION_CREATED_ADDON: False, } ) self.ws_address = user_input[CONF_URL] return self._async_create_entry_from_vars() return self.async_show_form( step_id="manual", data_schema=get_manual_schema(user_input), errors=errors ) async def async_step_hassio( self, discovery_info: HassioServiceInfo ) -> ConfigFlowResult: """Receive configuration from add-on discovery info. This flow is triggered by the Z-Wave JS add-on. """ if self._async_in_progress(): return self.async_abort(reason="already_in_progress") if discovery_info.slug != ADDON_SLUG: return self.async_abort(reason="not_zwave_js_addon") self.ws_address = ( f"ws://{discovery_info.config['host']}:{discovery_info.config['port']}" ) try: version_info = await async_get_version_info(self.hass, self.ws_address) except CannotConnect: return self.async_abort(reason="cannot_connect") await self.async_set_unique_id(str(version_info.home_id)) self._abort_if_unique_id_configured(updates={CONF_URL: self.ws_address}) return await self.async_step_hassio_confirm() async def async_step_hassio_confirm( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Confirm the add-on discovery.""" if user_input is not None: return await self.async_step_on_supervisor( user_input={CONF_USE_ADDON: True} ) return self.async_show_form(step_id="hassio_confirm") async def async_step_on_supervisor( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle logic when on Supervisor host.""" if user_input is None: return self.async_show_form( step_id="on_supervisor", data_schema=ON_SUPERVISOR_SCHEMA ) if not user_input[CONF_USE_ADDON]: return await self.async_step_manual() self.use_addon = True addon_info = await self._async_get_addon_info() if addon_info.state == AddonState.RUNNING: addon_config = addon_info.options self.usb_path = addon_config[CONF_ADDON_DEVICE] self.s0_legacy_key = addon_config.get(CONF_ADDON_S0_LEGACY_KEY, "") self.s2_access_control_key = addon_config.get( CONF_ADDON_S2_ACCESS_CONTROL_KEY, "" ) self.s2_authenticated_key = addon_config.get( CONF_ADDON_S2_AUTHENTICATED_KEY, "" ) self.s2_unauthenticated_key = addon_config.get( CONF_ADDON_S2_UNAUTHENTICATED_KEY, "" ) self.lr_s2_access_control_key = addon_config.get( CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, "" ) self.lr_s2_authenticated_key = addon_config.get( CONF_ADDON_LR_S2_AUTHENTICATED_KEY, "" ) return await self.async_step_finish_addon_setup_user() if addon_info.state == AddonState.NOT_RUNNING: return await self.async_step_configure_addon_user() return await self.async_step_install_addon() async def async_step_configure_addon_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Ask for config for Z-Wave JS add-on.""" addon_info = await self._async_get_addon_info() addon_config = addon_info.options if user_input is not None: self.s0_legacy_key = user_input[CONF_S0_LEGACY_KEY] self.s2_access_control_key = user_input[CONF_S2_ACCESS_CONTROL_KEY] self.s2_authenticated_key = user_input[CONF_S2_AUTHENTICATED_KEY] self.s2_unauthenticated_key = user_input[CONF_S2_UNAUTHENTICATED_KEY] self.lr_s2_access_control_key = user_input[CONF_LR_S2_ACCESS_CONTROL_KEY] self.lr_s2_authenticated_key = user_input[CONF_LR_S2_AUTHENTICATED_KEY] if not self._usb_discovery: self.usb_path = user_input[CONF_USB_PATH] addon_config_updates = { CONF_ADDON_DEVICE: self.usb_path, CONF_ADDON_S0_LEGACY_KEY: self.s0_legacy_key, CONF_ADDON_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key, CONF_ADDON_S2_AUTHENTICATED_KEY: self.s2_authenticated_key, CONF_ADDON_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key, CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key, CONF_ADDON_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key, } await self._async_set_addon_config(addon_config_updates) return await self.async_step_start_addon() usb_path = self.usb_path or addon_config.get(CONF_ADDON_DEVICE) or "" s0_legacy_key = addon_config.get( CONF_ADDON_S0_LEGACY_KEY, self.s0_legacy_key or "" ) s2_access_control_key = addon_config.get( CONF_ADDON_S2_ACCESS_CONTROL_KEY, self.s2_access_control_key or "" ) s2_authenticated_key = addon_config.get( CONF_ADDON_S2_AUTHENTICATED_KEY, self.s2_authenticated_key or "" ) s2_unauthenticated_key = addon_config.get( CONF_ADDON_S2_UNAUTHENTICATED_KEY, self.s2_unauthenticated_key or "" ) lr_s2_access_control_key = addon_config.get( CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, self.lr_s2_access_control_key or "" ) lr_s2_authenticated_key = addon_config.get( CONF_ADDON_LR_S2_AUTHENTICATED_KEY, self.lr_s2_authenticated_key or "" ) schema: VolDictType = { vol.Optional(CONF_S0_LEGACY_KEY, default=s0_legacy_key): str, vol.Optional( CONF_S2_ACCESS_CONTROL_KEY, default=s2_access_control_key ): str, vol.Optional(CONF_S2_AUTHENTICATED_KEY, default=s2_authenticated_key): str, vol.Optional( CONF_S2_UNAUTHENTICATED_KEY, default=s2_unauthenticated_key ): str, vol.Optional( CONF_LR_S2_ACCESS_CONTROL_KEY, default=lr_s2_access_control_key ): str, vol.Optional( CONF_LR_S2_AUTHENTICATED_KEY, default=lr_s2_authenticated_key ): str, } if not self._usb_discovery: try: ports = await async_get_usb_ports(self.hass) except OSError as err: _LOGGER.error("Failed to get USB ports: %s", err) return self.async_abort(reason="usb_ports_failed") schema = { vol.Required(CONF_USB_PATH, default=usb_path): vol.In(ports), **schema, } data_schema = vol.Schema(schema) return self.async_show_form( step_id="configure_addon_user", data_schema=data_schema ) async def async_step_finish_addon_setup_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Prepare info needed to complete the config entry. Get add-on discovery info and server version info. Set unique id and abort if already configured. """ if not self.ws_address: discovery_info = await self._async_get_addon_discovery_info() self.ws_address = f"ws://{discovery_info['host']}:{discovery_info['port']}" if not self.unique_id or self.source == SOURCE_USB: if not self.version_info: try: self.version_info = await async_get_version_info( self.hass, self.ws_address ) except CannotConnect as err: raise AbortFlow("cannot_connect") from err await self.async_set_unique_id( str(self.version_info.home_id), raise_on_progress=False ) self._abort_if_unique_id_configured( updates={ CONF_URL: self.ws_address, CONF_USB_PATH: self.usb_path, CONF_S0_LEGACY_KEY: self.s0_legacy_key, CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key, CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key, CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key, CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key, CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key, } ) return self._async_create_entry_from_vars() @callback def _async_create_entry_from_vars(self) -> ConfigFlowResult: """Return a config entry for the flow.""" # Abort any other flows that may be in progress for progress in self._async_in_progress(): self.hass.config_entries.flow.async_abort(progress["flow_id"]) return self.async_create_entry( title=TITLE, data={ CONF_URL: self.ws_address, CONF_USB_PATH: self.usb_path, CONF_S0_LEGACY_KEY: self.s0_legacy_key, CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key, CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key, CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key, CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key, CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key, CONF_USE_ADDON: self.use_addon, CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon, }, ) @callback def _async_update_entry( self, updates: dict[str, Any], *, schedule_reload: bool = True ) -> None: """Update the config entry with new data.""" config_entry = self._reconfigure_config_entry assert config_entry is not None self.hass.config_entries.async_update_entry( config_entry, data=config_entry.data | updates ) if schedule_reload: self.hass.config_entries.async_schedule_reload(config_entry.entry_id) async def async_step_intent_reconfigure( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Manage the options.""" if is_hassio(self.hass): return await self.async_step_on_supervisor_reconfigure() return await self.async_step_manual_reconfigure() async def async_step_intent_migrate( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Confirm the user wants to reset their current controller.""" config_entry = self._reconfigure_config_entry assert config_entry is not None if not self._usb_discovery and not config_entry.data.get(CONF_USE_ADDON): return self.async_abort(reason="addon_required") try: driver = self._get_driver() except AbortFlow: return self.async_abort(reason="config_entry_not_loaded") if ( sdk_version := driver.controller.sdk_version ) is not None and sdk_version < MIN_MIGRATION_SDK_VERSION: _LOGGER.warning( "Migration from this controller that has SDK version %s " "is not supported. If possible, update the firmware " "of the controller to a firmware built using SDK version %s or higher", sdk_version, MIN_MIGRATION_SDK_VERSION, ) return self.async_abort( reason="migration_low_sdk_version", description_placeholders={ "ok_sdk_version": str(MIN_MIGRATION_SDK_VERSION) }, ) if user_input is not None: self._migrating = True return await self.async_step_backup_nvm() return self.async_show_form(step_id="intent_migrate") async def async_step_backup_nvm( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Backup the current network.""" if self.backup_task is None: self.backup_task = self.hass.async_create_task(self._async_backup_network()) if not self.backup_task.done(): return self.async_show_progress( step_id="backup_nvm", progress_action="backup_nvm", progress_task=self.backup_task, ) try: await self.backup_task except AbortFlow as err: _LOGGER.error(err) return self.async_show_progress_done(next_step_id="backup_failed") finally: self.backup_task = None return self.async_show_progress_done(next_step_id="instruct_unplug") async def async_step_restore_nvm( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Restore the backup.""" if self.restore_backup_task is None: self.restore_backup_task = self.hass.async_create_task( self._async_restore_network_backup() ) if not self.restore_backup_task.done(): return self.async_show_progress( step_id="restore_nvm", progress_action="restore_nvm", progress_task=self.restore_backup_task, ) try: await self.restore_backup_task except AbortFlow as err: _LOGGER.error(err) return self.async_show_progress_done(next_step_id="restore_failed") finally: self.restore_backup_task = None return self.async_show_progress_done(next_step_id="migration_done") async def async_step_instruct_unplug( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Reset the current controller, and instruct the user to unplug it.""" if user_input is not None: if self.usb_path: # USB discovery was used, so the device is already known. await self._async_set_addon_config({CONF_ADDON_DEVICE: self.usb_path}) return await self.async_step_start_addon() # Now that the old controller is gone, we can scan for serial ports again return await self.async_step_choose_serial_port() # reset the old controller try: await self._get_driver().async_hard_reset() except (AbortFlow, FailedCommand) as err: _LOGGER.error("Failed to reset controller: %s", err) return self.async_abort(reason="reset_failed") config_entry = self._reconfigure_config_entry assert config_entry is not None # Unload the config entry before asking the user to unplug the controller. await self.hass.config_entries.async_unload(config_entry.entry_id) return self.async_show_form( step_id="instruct_unplug", description_placeholders={ "file_path": str(self.backup_filepath), }, ) async def async_step_manual_reconfigure( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle a manual configuration.""" config_entry = self._reconfigure_config_entry assert config_entry is not None if user_input is None: return self.async_show_form( step_id="manual_reconfigure", data_schema=get_manual_schema({CONF_URL: config_entry.data[CONF_URL]}), ) errors = {} try: version_info = await validate_input(self.hass, user_input) except InvalidInput as err: errors["base"] = err.error except Exception: _LOGGER.exception("Unexpected exception") errors["base"] = "unknown" else: if config_entry.unique_id != str(version_info.home_id): return self.async_abort(reason="different_device") # Make sure we disable any add-on handling # if the controller is reconfigured in a manual step. self._async_update_entry( { **user_input, CONF_USE_ADDON: False, CONF_INTEGRATION_CREATED_ADDON: False, } ) return self.async_abort(reason="reconfigure_successful") return self.async_show_form( step_id="manual_reconfigure", data_schema=get_manual_schema(user_input), errors=errors, ) async def async_step_on_supervisor_reconfigure( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle logic when on Supervisor host.""" config_entry = self._reconfigure_config_entry assert config_entry is not None if user_input is None: return self.async_show_form( step_id="on_supervisor_reconfigure", data_schema=get_on_supervisor_schema( {CONF_USE_ADDON: config_entry.data.get(CONF_USE_ADDON, True)} ), ) if not user_input[CONF_USE_ADDON]: if config_entry.data.get(CONF_USE_ADDON): # Unload the config entry before stopping the add-on. await self.hass.config_entries.async_unload(config_entry.entry_id) addon_manager = get_addon_manager(self.hass) _LOGGER.debug("Stopping Z-Wave JS add-on") try: await addon_manager.async_stop_addon() except AddonError as err: _LOGGER.error(err) self.hass.config_entries.async_schedule_reload( config_entry.entry_id ) raise AbortFlow("addon_stop_failed") from err return await self.async_step_manual_reconfigure() addon_info = await self._async_get_addon_info() if addon_info.state == AddonState.NOT_INSTALLED: return await self.async_step_install_addon() return await self.async_step_configure_addon_reconfigure() async def async_step_configure_addon_reconfigure( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Ask for config for Z-Wave JS add-on.""" addon_info = await self._async_get_addon_info() addon_config = addon_info.options if user_input is not None: self.s0_legacy_key = user_input[CONF_S0_LEGACY_KEY] self.s2_access_control_key = user_input[CONF_S2_ACCESS_CONTROL_KEY] self.s2_authenticated_key = user_input[CONF_S2_AUTHENTICATED_KEY] self.s2_unauthenticated_key = user_input[CONF_S2_UNAUTHENTICATED_KEY] self.lr_s2_access_control_key = user_input[CONF_LR_S2_ACCESS_CONTROL_KEY] self.lr_s2_authenticated_key = user_input[CONF_LR_S2_AUTHENTICATED_KEY] self.usb_path = user_input[CONF_USB_PATH] addon_config_updates = { CONF_ADDON_DEVICE: self.usb_path, CONF_ADDON_S0_LEGACY_KEY: self.s0_legacy_key, CONF_ADDON_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key, CONF_ADDON_S2_AUTHENTICATED_KEY: self.s2_authenticated_key, CONF_ADDON_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key, CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key, CONF_ADDON_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key, CONF_ADDON_LOG_LEVEL: user_input[CONF_LOG_LEVEL], CONF_ADDON_EMULATE_HARDWARE: user_input.get( CONF_EMULATE_HARDWARE, False ), } await self._async_set_addon_config(addon_config_updates) if addon_info.state == AddonState.RUNNING and not self.restart_addon: return await self.async_step_finish_addon_setup_reconfigure() if ( config_entry := self._reconfigure_config_entry ) and config_entry.data.get(CONF_USE_ADDON): # Disconnect integration before restarting add-on. await self.hass.config_entries.async_unload(config_entry.entry_id) return await self.async_step_start_addon() usb_path = addon_config.get(CONF_ADDON_DEVICE, self.usb_path or "") s0_legacy_key = addon_config.get( CONF_ADDON_S0_LEGACY_KEY, self.s0_legacy_key or "" ) s2_access_control_key = addon_config.get( CONF_ADDON_S2_ACCESS_CONTROL_KEY, self.s2_access_control_key or "" ) s2_authenticated_key = addon_config.get( CONF_ADDON_S2_AUTHENTICATED_KEY, self.s2_authenticated_key or "" ) s2_unauthenticated_key = addon_config.get( CONF_ADDON_S2_UNAUTHENTICATED_KEY, self.s2_unauthenticated_key or "" ) lr_s2_access_control_key = addon_config.get( CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, self.lr_s2_access_control_key or "" ) lr_s2_authenticated_key = addon_config.get( CONF_ADDON_LR_S2_AUTHENTICATED_KEY, self.lr_s2_authenticated_key or "" ) log_level = addon_config.get(CONF_ADDON_LOG_LEVEL, "info") emulate_hardware = addon_config.get(CONF_ADDON_EMULATE_HARDWARE, False) try: ports = await async_get_usb_ports(self.hass) except OSError as err: _LOGGER.error("Failed to get USB ports: %s", err) return self.async_abort(reason="usb_ports_failed") data_schema = vol.Schema( { vol.Required(CONF_USB_PATH, default=usb_path): vol.In(ports), vol.Optional(CONF_S0_LEGACY_KEY, default=s0_legacy_key): str, vol.Optional( CONF_S2_ACCESS_CONTROL_KEY, default=s2_access_control_key ): str, vol.Optional( CONF_S2_AUTHENTICATED_KEY, default=s2_authenticated_key ): str, vol.Optional( CONF_S2_UNAUTHENTICATED_KEY, default=s2_unauthenticated_key ): str, vol.Optional( CONF_LR_S2_ACCESS_CONTROL_KEY, default=lr_s2_access_control_key ): str, vol.Optional( CONF_LR_S2_AUTHENTICATED_KEY, default=lr_s2_authenticated_key ): str, vol.Optional(CONF_LOG_LEVEL, default=log_level): vol.In( ADDON_LOG_LEVELS ), vol.Optional(CONF_EMULATE_HARDWARE, default=emulate_hardware): bool, } ) return self.async_show_form( step_id="configure_addon_reconfigure", data_schema=data_schema ) async def async_step_choose_serial_port( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Choose a serial port.""" if user_input is not None: self.usb_path = user_input[CONF_USB_PATH] await self._async_set_addon_config({CONF_ADDON_DEVICE: self.usb_path}) return await self.async_step_start_addon() try: ports = await async_get_usb_ports(self.hass) except OSError as err: _LOGGER.error("Failed to get USB ports: %s", err) return self.async_abort(reason="usb_ports_failed") data_schema = vol.Schema( { vol.Required(CONF_USB_PATH): vol.In(ports), } ) return self.async_show_form( step_id="choose_serial_port", data_schema=data_schema ) async def async_step_backup_failed( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Backup failed.""" return self.async_abort(reason="backup_failed") async def async_step_restore_failed( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Restore failed.""" if user_input is not None: return await self.async_step_restore_nvm() return self.async_show_form( step_id="restore_failed", description_placeholders={ "file_path": str(self.backup_filepath), }, ) async def async_step_migration_done( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Migration done.""" return self.async_abort(reason="migration_successful") async def async_step_finish_addon_setup_migrate( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Prepare info needed to complete the config entry update.""" ws_address = self.ws_address assert ws_address is not None version_info = self.version_info assert version_info is not None # We need to wait for the config entry to be reloaded, # before restoring the backup. # We will do this in the restore nvm progress task, # to get a nicer user experience. self._async_update_entry( { "unique_id": str(version_info.home_id), CONF_URL: ws_address, CONF_USB_PATH: self.usb_path, CONF_S0_LEGACY_KEY: self.s0_legacy_key, CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key, CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key, CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key, CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key, CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key, CONF_USE_ADDON: True, CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon, }, schedule_reload=False, ) return await self.async_step_restore_nvm() async def async_step_finish_addon_setup_reconfigure( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Prepare info needed to complete the config entry update. Get add-on discovery info and server version info. Check for same unique id and abort if not the same unique id. """ config_entry = self._reconfigure_config_entry assert config_entry is not None if self.revert_reason: self.original_addon_config = None reason = self.revert_reason self.revert_reason = None return await self.async_revert_addon_config(reason=reason) if not self.ws_address: discovery_info = await self._async_get_addon_discovery_info() self.ws_address = f"ws://{discovery_info['host']}:{discovery_info['port']}" if not self.version_info: try: self.version_info = await async_get_version_info( self.hass, self.ws_address ) except CannotConnect: return await self.async_revert_addon_config(reason="cannot_connect") if config_entry.unique_id != str(self.version_info.home_id): return await self.async_revert_addon_config(reason="different_device") self._async_update_entry( { CONF_URL: self.ws_address, CONF_USB_PATH: self.usb_path, CONF_S0_LEGACY_KEY: self.s0_legacy_key, CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key, CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key, CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key, CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key, CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key, CONF_USE_ADDON: True, CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon, } ) return self.async_abort(reason="reconfigure_successful") async def async_revert_addon_config(self, reason: str) -> ConfigFlowResult: """Abort the options flow. If the add-on options have been changed, revert those and restart add-on. """ # If reverting the add-on options failed, abort immediately. if self.revert_reason: _LOGGER.error( "Failed to revert add-on options before aborting flow, reason: %s", reason, ) if self.revert_reason or not self.original_addon_config: config_entry = self._reconfigure_config_entry assert config_entry is not None self.hass.config_entries.async_schedule_reload(config_entry.entry_id) return self.async_abort(reason=reason) self.revert_reason = reason addon_config_input = { ADDON_USER_INPUT_MAP[addon_key]: addon_val for addon_key, addon_val in self.original_addon_config.items() if addon_key in ADDON_USER_INPUT_MAP } _LOGGER.debug("Reverting add-on options, reason: %s", reason) return await self.async_step_configure_addon_reconfigure(addon_config_input) async def _async_backup_network(self) -> None: """Backup the current network.""" @callback def forward_progress(event: dict) -> None: """Forward progress events to frontend.""" self.async_update_progress(event["bytesRead"] / event["total"]) controller = self._get_driver().controller unsub = controller.on("nvm backup progress", forward_progress) try: self.backup_data = await controller.async_backup_nvm_raw() except FailedCommand as err: raise AbortFlow(f"Failed to backup network: {err}") from err finally: unsub() # save the backup to a file just in case self.backup_filepath = self.hass.config.path( f"zwavejs_nvm_backup_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.bin" ) try: await self.hass.async_add_executor_job( Path(self.backup_filepath).write_bytes, self.backup_data, ) except OSError as err: raise AbortFlow(f"Failed to save backup file: {err}") from err async def _async_restore_network_backup(self) -> None: """Restore the backup.""" assert self.backup_data is not None config_entry = self._reconfigure_config_entry assert config_entry is not None # Reload the config entry to reconnect the client after the addon restart await self.hass.config_entries.async_reload(config_entry.entry_id) @callback def forward_progress(event: dict) -> None: """Forward progress events to frontend.""" if event["event"] == "nvm convert progress": # assume convert is 50% of the total progress self.async_update_progress(event["bytesRead"] / event["total"] * 0.5) elif event["event"] == "nvm restore progress": # assume restore is the rest of the progress self.async_update_progress( event["bytesWritten"] / event["total"] * 0.5 + 0.5 ) @callback def set_driver_ready(event: dict) -> None: "Set the driver ready event." wait_driver_ready.set() driver = self._get_driver() controller = driver.controller wait_driver_ready = asyncio.Event() unsubs = [ controller.on("nvm convert progress", forward_progress), controller.on("nvm restore progress", forward_progress), driver.once("driver ready", set_driver_ready), ] try: await controller.async_restore_nvm(self.backup_data) except FailedCommand as err: raise AbortFlow(f"Failed to restore network: {err}") from err else: with suppress(TimeoutError): async with asyncio.timeout(RESTORE_NVM_DRIVER_READY_TIMEOUT): await wait_driver_ready.wait() await self.hass.config_entries.async_reload(config_entry.entry_id) finally: for unsub in unsubs: unsub() def _get_driver(self) -> Driver: """Get the driver from the config entry.""" config_entry = self._reconfigure_config_entry assert config_entry is not None if config_entry.state != ConfigEntryState.LOADED: raise AbortFlow("Configuration entry is not loaded") client: Client = config_entry.runtime_data[DATA_CLIENT] assert client.driver is not None return client.driver class CannotConnect(HomeAssistantError): """Indicate connection error.""" class InvalidInput(HomeAssistantError): """Error to indicate input data is invalid.""" def __init__(self, error: str) -> None: """Initialize error.""" super().__init__() self.error = error