1
0
mirror of https://github.com/home-assistant/core.git synced 2025-05-16 03:49:13 +00:00

1371 lines
53 KiB
Python

"""Config flow for Z-Wave JS integration."""
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import datetime
import logging
from pathlib import Path
from typing import Any
import aiohttp
from awesomeversion import AwesomeVersion
from serial.tools import list_ports
import voluptuous as vol
from zwave_js_server.client import Client
from zwave_js_server.exceptions import FailedCommand
from zwave_js_server.model.driver import Driver
from zwave_js_server.version import VersionInfo, get_server_version
from homeassistant.components import usb
from homeassistant.components.hassio import (
AddonError,
AddonInfo,
AddonManager,
AddonState,
)
from homeassistant.config_entries import (
SOURCE_USB,
ConfigEntry,
ConfigEntryState,
ConfigFlow,
ConfigFlowResult,
)
from homeassistant.const import CONF_NAME, CONF_URL
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.service_info.hassio import HassioServiceInfo
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from homeassistant.helpers.typing import VolDictType
from .addon import get_addon_manager
from .const import (
ADDON_SLUG,
CONF_ADDON_DEVICE,
CONF_ADDON_EMULATE_HARDWARE,
CONF_ADDON_LOG_LEVEL,
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY,
CONF_ADDON_LR_S2_AUTHENTICATED_KEY,
CONF_ADDON_NETWORK_KEY,
CONF_ADDON_S0_LEGACY_KEY,
CONF_ADDON_S2_ACCESS_CONTROL_KEY,
CONF_ADDON_S2_AUTHENTICATED_KEY,
CONF_ADDON_S2_UNAUTHENTICATED_KEY,
CONF_INTEGRATION_CREATED_ADDON,
CONF_LR_S2_ACCESS_CONTROL_KEY,
CONF_LR_S2_AUTHENTICATED_KEY,
CONF_S0_LEGACY_KEY,
CONF_S2_ACCESS_CONTROL_KEY,
CONF_S2_AUTHENTICATED_KEY,
CONF_S2_UNAUTHENTICATED_KEY,
CONF_USB_PATH,
CONF_USE_ADDON,
DATA_CLIENT,
DOMAIN,
RESTORE_NVM_DRIVER_READY_TIMEOUT,
)
_LOGGER = logging.getLogger(__name__)
DEFAULT_URL = "ws://localhost:3000"
TITLE = "Z-Wave JS"
ADDON_SETUP_TIMEOUT = 5
ADDON_SETUP_TIMEOUT_ROUNDS = 40
CONF_EMULATE_HARDWARE = "emulate_hardware"
CONF_LOG_LEVEL = "log_level"
SERVER_VERSION_TIMEOUT = 10
ADDON_LOG_LEVELS = {
"error": "Error",
"warn": "Warn",
"info": "Info",
"verbose": "Verbose",
"debug": "Debug",
"silly": "Silly",
}
ADDON_USER_INPUT_MAP = {
CONF_ADDON_DEVICE: CONF_USB_PATH,
CONF_ADDON_S0_LEGACY_KEY: CONF_S0_LEGACY_KEY,
CONF_ADDON_S2_ACCESS_CONTROL_KEY: CONF_S2_ACCESS_CONTROL_KEY,
CONF_ADDON_S2_AUTHENTICATED_KEY: CONF_S2_AUTHENTICATED_KEY,
CONF_ADDON_S2_UNAUTHENTICATED_KEY: CONF_S2_UNAUTHENTICATED_KEY,
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY: CONF_LR_S2_ACCESS_CONTROL_KEY,
CONF_ADDON_LR_S2_AUTHENTICATED_KEY: CONF_LR_S2_AUTHENTICATED_KEY,
CONF_ADDON_LOG_LEVEL: CONF_LOG_LEVEL,
CONF_ADDON_EMULATE_HARDWARE: CONF_EMULATE_HARDWARE,
}
ON_SUPERVISOR_SCHEMA = vol.Schema({vol.Optional(CONF_USE_ADDON, default=True): bool})
MIN_MIGRATION_SDK_VERSION = AwesomeVersion("6.61")
def get_manual_schema(user_input: dict[str, Any]) -> vol.Schema:
"""Return a schema for the manual step."""
default_url = user_input.get(CONF_URL, DEFAULT_URL)
return vol.Schema({vol.Required(CONF_URL, default=default_url): str})
def get_on_supervisor_schema(user_input: dict[str, Any]) -> vol.Schema:
"""Return a schema for the on Supervisor step."""
default_use_addon = user_input[CONF_USE_ADDON]
return vol.Schema({vol.Optional(CONF_USE_ADDON, default=default_use_addon): bool})
async def validate_input(hass: HomeAssistant, user_input: dict) -> VersionInfo:
"""Validate if the user input allows us to connect."""
ws_address = user_input[CONF_URL]
if not ws_address.startswith(("ws://", "wss://")):
raise InvalidInput("invalid_ws_url")
try:
return await async_get_version_info(hass, ws_address)
except CannotConnect as err:
raise InvalidInput("cannot_connect") from err
async def async_get_version_info(hass: HomeAssistant, ws_address: str) -> VersionInfo:
"""Return Z-Wave JS version info."""
try:
async with asyncio.timeout(SERVER_VERSION_TIMEOUT):
version_info: VersionInfo = await get_server_version(
ws_address, async_get_clientsession(hass)
)
except (TimeoutError, aiohttp.ClientError) as err:
# We don't want to spam the log if the add-on isn't started
# or takes a long time to start.
_LOGGER.debug("Failed to connect to Z-Wave JS server: %s", err)
raise CannotConnect from err
return version_info
def get_usb_ports() -> dict[str, str]:
"""Return a dict of USB ports and their friendly names."""
ports = list_ports.comports()
port_descriptions = {}
for port in ports:
vid: str | None = None
pid: str | None = None
if port.vid is not None and port.pid is not None:
usb_device = usb.usb_device_from_port(port)
vid = usb_device.vid
pid = usb_device.pid
dev_path = usb.get_serial_by_id(port.device)
human_name = usb.human_readable_device_name(
dev_path,
port.serial_number,
port.manufacturer,
port.description,
vid,
pid,
)
port_descriptions[dev_path] = human_name
return port_descriptions
async def async_get_usb_ports(hass: HomeAssistant) -> dict[str, str]:
"""Return a dict of USB ports and their friendly names."""
return await hass.async_add_executor_job(get_usb_ports)
class ZWaveJSConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Z-Wave JS."""
VERSION = 1
_title: str
def __init__(self) -> None:
"""Set up flow instance."""
self.s0_legacy_key: str | None = None
self.s2_access_control_key: str | None = None
self.s2_authenticated_key: str | None = None
self.s2_unauthenticated_key: str | None = None
self.lr_s2_access_control_key: str | None = None
self.lr_s2_authenticated_key: str | None = None
self.usb_path: str | None = None
self.ws_address: str | None = None
self.restart_addon: bool = False
# If we install the add-on we should uninstall it on entry remove.
self.integration_created_addon = False
self.install_task: asyncio.Task | None = None
self.start_task: asyncio.Task | None = None
self.version_info: VersionInfo | None = None
self.original_addon_config: dict[str, Any] | None = None
self.revert_reason: str | None = None
self.backup_task: asyncio.Task | None = None
self.restore_backup_task: asyncio.Task | None = None
self.backup_data: bytes | None = None
self.backup_filepath: str | None = None
self.use_addon = False
self._migrating = False
self._reconfigure_config_entry: ConfigEntry | None = None
self._usb_discovery = False
async def async_step_install_addon(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Install Z-Wave JS add-on."""
if not self.install_task:
self.install_task = self.hass.async_create_task(self._async_install_addon())
if not self.install_task.done():
return self.async_show_progress(
step_id="install_addon",
progress_action="install_addon",
progress_task=self.install_task,
)
try:
await self.install_task
except AddonError as err:
_LOGGER.error(err)
return self.async_show_progress_done(next_step_id="install_failed")
finally:
self.install_task = None
self.integration_created_addon = True
return self.async_show_progress_done(next_step_id="configure_addon")
async def async_step_install_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Add-on installation failed."""
return self.async_abort(reason="addon_install_failed")
async def async_step_start_addon(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Start Z-Wave JS add-on."""
if not self.start_task:
self.start_task = self.hass.async_create_task(self._async_start_addon())
if not self.start_task.done():
return self.async_show_progress(
step_id="start_addon",
progress_action="start_addon",
progress_task=self.start_task,
)
try:
await self.start_task
except (CannotConnect, AddonError, AbortFlow) as err:
_LOGGER.error(err)
return self.async_show_progress_done(next_step_id="start_failed")
finally:
self.start_task = None
return self.async_show_progress_done(next_step_id="finish_addon_setup")
async def async_step_start_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Add-on start failed."""
if self._migrating:
return self.async_abort(reason="addon_start_failed")
if self._reconfigure_config_entry:
return await self.async_revert_addon_config(reason="addon_start_failed")
return self.async_abort(reason="addon_start_failed")
async def _async_start_addon(self) -> None:
"""Start the Z-Wave JS add-on."""
addon_manager: AddonManager = get_addon_manager(self.hass)
self.version_info = None
if self.restart_addon:
await addon_manager.async_schedule_restart_addon()
else:
await addon_manager.async_schedule_start_addon()
# Sleep some seconds to let the add-on start properly before connecting.
for _ in range(ADDON_SETUP_TIMEOUT_ROUNDS):
await asyncio.sleep(ADDON_SETUP_TIMEOUT)
try:
if not self.ws_address:
discovery_info = await self._async_get_addon_discovery_info()
self.ws_address = (
f"ws://{discovery_info['host']}:{discovery_info['port']}"
)
self.version_info = await async_get_version_info(
self.hass, self.ws_address
)
except (AbortFlow, CannotConnect) as err:
_LOGGER.debug(
"Add-on not ready yet, waiting %s seconds: %s",
ADDON_SETUP_TIMEOUT,
err,
)
else:
break
else:
raise CannotConnect("Failed to start Z-Wave JS add-on: timeout")
async def async_step_configure_addon(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Ask for config for Z-Wave JS add-on."""
if self._reconfigure_config_entry:
return await self.async_step_configure_addon_reconfigure(user_input)
return await self.async_step_configure_addon_user(user_input)
async def async_step_finish_addon_setup(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prepare info needed to complete the config entry.
Get add-on discovery info and server version info.
Set unique id and abort if already configured.
"""
if self._migrating:
return await self.async_step_finish_addon_setup_migrate(user_input)
if self._reconfigure_config_entry:
return await self.async_step_finish_addon_setup_reconfigure(user_input)
return await self.async_step_finish_addon_setup_user(user_input)
async def _async_get_addon_info(self) -> AddonInfo:
"""Return and cache Z-Wave JS add-on info."""
addon_manager: AddonManager = get_addon_manager(self.hass)
try:
addon_info: AddonInfo = await addon_manager.async_get_addon_info()
except AddonError as err:
_LOGGER.error(err)
raise AbortFlow("addon_info_failed") from err
return addon_info
async def _async_set_addon_config(self, config_updates: dict) -> None:
"""Set Z-Wave JS add-on config."""
addon_info = await self._async_get_addon_info()
addon_config = addon_info.options
new_addon_config = addon_config | config_updates
if new_addon_config == addon_config:
return
if addon_info.state == AddonState.RUNNING:
self.restart_addon = True
# Copy the add-on config to keep the objects separate.
self.original_addon_config = dict(addon_config)
# Remove legacy network_key
new_addon_config.pop(CONF_ADDON_NETWORK_KEY, None)
addon_manager: AddonManager = get_addon_manager(self.hass)
try:
await addon_manager.async_set_addon_options(new_addon_config)
except AddonError as err:
_LOGGER.error(err)
raise AbortFlow("addon_set_config_failed") from err
async def _async_install_addon(self) -> None:
"""Install the Z-Wave JS add-on."""
addon_manager: AddonManager = get_addon_manager(self.hass)
await addon_manager.async_schedule_install_addon()
async def _async_get_addon_discovery_info(self) -> dict:
"""Return add-on discovery info."""
addon_manager: AddonManager = get_addon_manager(self.hass)
try:
discovery_info_config = await addon_manager.async_get_addon_discovery_info()
except AddonError as err:
_LOGGER.error(err)
raise AbortFlow("addon_get_discovery_info_failed") from err
return discovery_info_config
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
if is_hassio(self.hass):
return await self.async_step_on_supervisor()
return await self.async_step_manual()
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm if we are migrating adapters or just re-configuring."""
self._reconfigure_config_entry = self._get_reconfigure_entry()
return self.async_show_menu(
step_id="reconfigure",
menu_options=[
"intent_reconfigure",
"intent_migrate",
],
)
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle zeroconf discovery."""
home_id = str(discovery_info.properties["homeId"])
await self.async_set_unique_id(home_id)
self._abort_if_unique_id_configured()
self.ws_address = f"ws://{discovery_info.host}:{discovery_info.port}"
self.context.update({"title_placeholders": {CONF_NAME: home_id}})
return await self.async_step_zeroconf_confirm()
async def async_step_zeroconf_confirm(
self, user_input: dict | None = None
) -> ConfigFlowResult:
"""Confirm the setup."""
if user_input is not None:
return await self.async_step_manual({CONF_URL: self.ws_address})
assert self.ws_address
assert self.unique_id
return self.async_show_form(
step_id="zeroconf_confirm",
description_placeholders={
"home_id": self.unique_id,
CONF_URL: self.ws_address[5:],
},
)
async def async_step_usb(self, discovery_info: UsbServiceInfo) -> ConfigFlowResult:
"""Handle USB Discovery."""
if not is_hassio(self.hass):
return self.async_abort(reason="discovery_requires_supervisor")
if any(
flow
for flow in self._async_in_progress()
if flow["context"].get("source") != SOURCE_USB
):
# Allow multiple USB discovery flows to be in progress.
# Migration requires more than one USB stick to be connected,
# which can cause more than one discovery flow to be in progress,
# at least for a short time.
return self.async_abort(reason="already_in_progress")
if current_config_entries := self._async_current_entries(include_ignore=False):
config_entry = next(
(
entry
for entry in current_config_entries
if entry.data.get(CONF_USE_ADDON)
),
None,
)
if not config_entry:
return self.async_abort(reason="addon_required")
vid = discovery_info.vid
pid = discovery_info.pid
serial_number = discovery_info.serial_number
manufacturer = discovery_info.manufacturer
description = discovery_info.description
# Zooz uses this vid/pid, but so do 2652 sticks
if vid == "10C4" and pid == "EA60" and description and "2652" in description:
return self.async_abort(reason="not_zwave_device")
discovery_info.device = await self.hass.async_add_executor_job(
usb.get_serial_by_id, discovery_info.device
)
addon_info = await self._async_get_addon_info()
if (
addon_info.state not in (AddonState.NOT_INSTALLED, AddonState.INSTALLING)
and (addon_device := addon_info.options.get(CONF_ADDON_DEVICE)) is not None
and await self.hass.async_add_executor_job(
usb.get_serial_by_id, addon_device
)
== discovery_info.device
):
return self.async_abort(reason="already_configured")
await self.async_set_unique_id(
f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}"
)
# We don't need to check if the unique_id is already configured
# since we will update the unique_id before finishing the flow.
# The unique_id set above is just a temporary value to avoid
# duplicate discovery flows.
dev_path = discovery_info.device
self.usb_path = dev_path
if manufacturer == "Nabu Casa" and description == "ZWA-2 - Nabu Casa ZWA-2":
title = "Home Assistant Connect ZWA-2"
else:
human_name = usb.human_readable_device_name(
dev_path,
serial_number,
manufacturer,
description,
vid,
pid,
)
title = human_name.split(" - ")[0].strip()
self.context["title_placeholders"] = {CONF_NAME: title}
self._title = title
return await self.async_step_usb_confirm()
async def async_step_usb_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle USB Discovery confirmation."""
if user_input is None:
return self.async_show_form(
step_id="usb_confirm",
description_placeholders={CONF_NAME: self._title},
)
self._usb_discovery = True
if current_config_entries := self._async_current_entries(include_ignore=False):
self._reconfigure_config_entry = next(
(
entry
for entry in current_config_entries
if entry.data.get(CONF_USE_ADDON)
),
None,
)
if not self._reconfigure_config_entry:
return self.async_abort(reason="addon_required")
return await self.async_step_intent_migrate()
return await self.async_step_on_supervisor({CONF_USE_ADDON: True})
async def async_step_manual(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a manual configuration."""
if user_input is None:
return self.async_show_form(
step_id="manual", data_schema=get_manual_schema({})
)
errors = {}
try:
version_info = await validate_input(self.hass, user_input)
except InvalidInput as err:
errors["base"] = err.error
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(
str(version_info.home_id), raise_on_progress=False
)
# Make sure we disable any add-on handling
# if the controller is reconfigured in a manual step.
self._abort_if_unique_id_configured(
updates={
**user_input,
CONF_USE_ADDON: False,
CONF_INTEGRATION_CREATED_ADDON: False,
}
)
self.ws_address = user_input[CONF_URL]
return self._async_create_entry_from_vars()
return self.async_show_form(
step_id="manual", data_schema=get_manual_schema(user_input), errors=errors
)
async def async_step_hassio(
self, discovery_info: HassioServiceInfo
) -> ConfigFlowResult:
"""Receive configuration from add-on discovery info.
This flow is triggered by the Z-Wave JS add-on.
"""
if self._async_in_progress():
return self.async_abort(reason="already_in_progress")
if discovery_info.slug != ADDON_SLUG:
return self.async_abort(reason="not_zwave_js_addon")
self.ws_address = (
f"ws://{discovery_info.config['host']}:{discovery_info.config['port']}"
)
try:
version_info = await async_get_version_info(self.hass, self.ws_address)
except CannotConnect:
return self.async_abort(reason="cannot_connect")
await self.async_set_unique_id(str(version_info.home_id))
self._abort_if_unique_id_configured(updates={CONF_URL: self.ws_address})
return await self.async_step_hassio_confirm()
async def async_step_hassio_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm the add-on discovery."""
if user_input is not None:
return await self.async_step_on_supervisor(
user_input={CONF_USE_ADDON: True}
)
return self.async_show_form(step_id="hassio_confirm")
async def async_step_on_supervisor(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle logic when on Supervisor host."""
if user_input is None:
return self.async_show_form(
step_id="on_supervisor", data_schema=ON_SUPERVISOR_SCHEMA
)
if not user_input[CONF_USE_ADDON]:
return await self.async_step_manual()
self.use_addon = True
addon_info = await self._async_get_addon_info()
if addon_info.state == AddonState.RUNNING:
addon_config = addon_info.options
self.usb_path = addon_config[CONF_ADDON_DEVICE]
self.s0_legacy_key = addon_config.get(CONF_ADDON_S0_LEGACY_KEY, "")
self.s2_access_control_key = addon_config.get(
CONF_ADDON_S2_ACCESS_CONTROL_KEY, ""
)
self.s2_authenticated_key = addon_config.get(
CONF_ADDON_S2_AUTHENTICATED_KEY, ""
)
self.s2_unauthenticated_key = addon_config.get(
CONF_ADDON_S2_UNAUTHENTICATED_KEY, ""
)
self.lr_s2_access_control_key = addon_config.get(
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, ""
)
self.lr_s2_authenticated_key = addon_config.get(
CONF_ADDON_LR_S2_AUTHENTICATED_KEY, ""
)
return await self.async_step_finish_addon_setup_user()
if addon_info.state == AddonState.NOT_RUNNING:
return await self.async_step_configure_addon_user()
return await self.async_step_install_addon()
async def async_step_configure_addon_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Ask for config for Z-Wave JS add-on."""
addon_info = await self._async_get_addon_info()
addon_config = addon_info.options
if user_input is not None:
self.s0_legacy_key = user_input[CONF_S0_LEGACY_KEY]
self.s2_access_control_key = user_input[CONF_S2_ACCESS_CONTROL_KEY]
self.s2_authenticated_key = user_input[CONF_S2_AUTHENTICATED_KEY]
self.s2_unauthenticated_key = user_input[CONF_S2_UNAUTHENTICATED_KEY]
self.lr_s2_access_control_key = user_input[CONF_LR_S2_ACCESS_CONTROL_KEY]
self.lr_s2_authenticated_key = user_input[CONF_LR_S2_AUTHENTICATED_KEY]
if not self._usb_discovery:
self.usb_path = user_input[CONF_USB_PATH]
addon_config_updates = {
CONF_ADDON_DEVICE: self.usb_path,
CONF_ADDON_S0_LEGACY_KEY: self.s0_legacy_key,
CONF_ADDON_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key,
CONF_ADDON_S2_AUTHENTICATED_KEY: self.s2_authenticated_key,
CONF_ADDON_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key,
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key,
CONF_ADDON_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key,
}
await self._async_set_addon_config(addon_config_updates)
return await self.async_step_start_addon()
usb_path = self.usb_path or addon_config.get(CONF_ADDON_DEVICE) or ""
s0_legacy_key = addon_config.get(
CONF_ADDON_S0_LEGACY_KEY, self.s0_legacy_key or ""
)
s2_access_control_key = addon_config.get(
CONF_ADDON_S2_ACCESS_CONTROL_KEY, self.s2_access_control_key or ""
)
s2_authenticated_key = addon_config.get(
CONF_ADDON_S2_AUTHENTICATED_KEY, self.s2_authenticated_key or ""
)
s2_unauthenticated_key = addon_config.get(
CONF_ADDON_S2_UNAUTHENTICATED_KEY, self.s2_unauthenticated_key or ""
)
lr_s2_access_control_key = addon_config.get(
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, self.lr_s2_access_control_key or ""
)
lr_s2_authenticated_key = addon_config.get(
CONF_ADDON_LR_S2_AUTHENTICATED_KEY, self.lr_s2_authenticated_key or ""
)
schema: VolDictType = {
vol.Optional(CONF_S0_LEGACY_KEY, default=s0_legacy_key): str,
vol.Optional(
CONF_S2_ACCESS_CONTROL_KEY, default=s2_access_control_key
): str,
vol.Optional(CONF_S2_AUTHENTICATED_KEY, default=s2_authenticated_key): str,
vol.Optional(
CONF_S2_UNAUTHENTICATED_KEY, default=s2_unauthenticated_key
): str,
vol.Optional(
CONF_LR_S2_ACCESS_CONTROL_KEY, default=lr_s2_access_control_key
): str,
vol.Optional(
CONF_LR_S2_AUTHENTICATED_KEY, default=lr_s2_authenticated_key
): str,
}
if not self._usb_discovery:
try:
ports = await async_get_usb_ports(self.hass)
except OSError as err:
_LOGGER.error("Failed to get USB ports: %s", err)
return self.async_abort(reason="usb_ports_failed")
schema = {
vol.Required(CONF_USB_PATH, default=usb_path): vol.In(ports),
**schema,
}
data_schema = vol.Schema(schema)
return self.async_show_form(
step_id="configure_addon_user", data_schema=data_schema
)
async def async_step_finish_addon_setup_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prepare info needed to complete the config entry.
Get add-on discovery info and server version info.
Set unique id and abort if already configured.
"""
if not self.ws_address:
discovery_info = await self._async_get_addon_discovery_info()
self.ws_address = f"ws://{discovery_info['host']}:{discovery_info['port']}"
if not self.unique_id or self.source == SOURCE_USB:
if not self.version_info:
try:
self.version_info = await async_get_version_info(
self.hass, self.ws_address
)
except CannotConnect as err:
raise AbortFlow("cannot_connect") from err
await self.async_set_unique_id(
str(self.version_info.home_id), raise_on_progress=False
)
self._abort_if_unique_id_configured(
updates={
CONF_URL: self.ws_address,
CONF_USB_PATH: self.usb_path,
CONF_S0_LEGACY_KEY: self.s0_legacy_key,
CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key,
CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key,
CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key,
CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key,
CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key,
}
)
return self._async_create_entry_from_vars()
@callback
def _async_create_entry_from_vars(self) -> ConfigFlowResult:
"""Return a config entry for the flow."""
# Abort any other flows that may be in progress
for progress in self._async_in_progress():
self.hass.config_entries.flow.async_abort(progress["flow_id"])
return self.async_create_entry(
title=TITLE,
data={
CONF_URL: self.ws_address,
CONF_USB_PATH: self.usb_path,
CONF_S0_LEGACY_KEY: self.s0_legacy_key,
CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key,
CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key,
CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key,
CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key,
CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key,
CONF_USE_ADDON: self.use_addon,
CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon,
},
)
@callback
def _async_update_entry(
self, updates: dict[str, Any], *, schedule_reload: bool = True
) -> None:
"""Update the config entry with new data."""
config_entry = self._reconfigure_config_entry
assert config_entry is not None
self.hass.config_entries.async_update_entry(
config_entry, data=config_entry.data | updates
)
if schedule_reload:
self.hass.config_entries.async_schedule_reload(config_entry.entry_id)
async def async_step_intent_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manage the options."""
if is_hassio(self.hass):
return await self.async_step_on_supervisor_reconfigure()
return await self.async_step_manual_reconfigure()
async def async_step_intent_migrate(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm the user wants to reset their current controller."""
config_entry = self._reconfigure_config_entry
assert config_entry is not None
if not self._usb_discovery and not config_entry.data.get(CONF_USE_ADDON):
return self.async_abort(reason="addon_required")
try:
driver = self._get_driver()
except AbortFlow:
return self.async_abort(reason="config_entry_not_loaded")
if (
sdk_version := driver.controller.sdk_version
) is not None and sdk_version < MIN_MIGRATION_SDK_VERSION:
_LOGGER.warning(
"Migration from this controller that has SDK version %s "
"is not supported. If possible, update the firmware "
"of the controller to a firmware built using SDK version %s or higher",
sdk_version,
MIN_MIGRATION_SDK_VERSION,
)
return self.async_abort(
reason="migration_low_sdk_version",
description_placeholders={
"ok_sdk_version": str(MIN_MIGRATION_SDK_VERSION)
},
)
if user_input is not None:
self._migrating = True
return await self.async_step_backup_nvm()
return self.async_show_form(step_id="intent_migrate")
async def async_step_backup_nvm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Backup the current network."""
if self.backup_task is None:
self.backup_task = self.hass.async_create_task(self._async_backup_network())
if not self.backup_task.done():
return self.async_show_progress(
step_id="backup_nvm",
progress_action="backup_nvm",
progress_task=self.backup_task,
)
try:
await self.backup_task
except AbortFlow as err:
_LOGGER.error(err)
return self.async_show_progress_done(next_step_id="backup_failed")
finally:
self.backup_task = None
return self.async_show_progress_done(next_step_id="instruct_unplug")
async def async_step_restore_nvm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Restore the backup."""
if self.restore_backup_task is None:
self.restore_backup_task = self.hass.async_create_task(
self._async_restore_network_backup()
)
if not self.restore_backup_task.done():
return self.async_show_progress(
step_id="restore_nvm",
progress_action="restore_nvm",
progress_task=self.restore_backup_task,
)
try:
await self.restore_backup_task
except AbortFlow as err:
_LOGGER.error(err)
return self.async_show_progress_done(next_step_id="restore_failed")
finally:
self.restore_backup_task = None
return self.async_show_progress_done(next_step_id="migration_done")
async def async_step_instruct_unplug(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Reset the current controller, and instruct the user to unplug it."""
if user_input is not None:
if self.usb_path:
# USB discovery was used, so the device is already known.
await self._async_set_addon_config({CONF_ADDON_DEVICE: self.usb_path})
return await self.async_step_start_addon()
# Now that the old controller is gone, we can scan for serial ports again
return await self.async_step_choose_serial_port()
# reset the old controller
try:
await self._get_driver().async_hard_reset()
except (AbortFlow, FailedCommand) as err:
_LOGGER.error("Failed to reset controller: %s", err)
return self.async_abort(reason="reset_failed")
config_entry = self._reconfigure_config_entry
assert config_entry is not None
# Unload the config entry before asking the user to unplug the controller.
await self.hass.config_entries.async_unload(config_entry.entry_id)
return self.async_show_form(
step_id="instruct_unplug",
description_placeholders={
"file_path": str(self.backup_filepath),
},
)
async def async_step_manual_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a manual configuration."""
config_entry = self._reconfigure_config_entry
assert config_entry is not None
if user_input is None:
return self.async_show_form(
step_id="manual_reconfigure",
data_schema=get_manual_schema({CONF_URL: config_entry.data[CONF_URL]}),
)
errors = {}
try:
version_info = await validate_input(self.hass, user_input)
except InvalidInput as err:
errors["base"] = err.error
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
if config_entry.unique_id != str(version_info.home_id):
return self.async_abort(reason="different_device")
# Make sure we disable any add-on handling
# if the controller is reconfigured in a manual step.
self._async_update_entry(
{
**user_input,
CONF_USE_ADDON: False,
CONF_INTEGRATION_CREATED_ADDON: False,
}
)
return self.async_abort(reason="reconfigure_successful")
return self.async_show_form(
step_id="manual_reconfigure",
data_schema=get_manual_schema(user_input),
errors=errors,
)
async def async_step_on_supervisor_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle logic when on Supervisor host."""
config_entry = self._reconfigure_config_entry
assert config_entry is not None
if user_input is None:
return self.async_show_form(
step_id="on_supervisor_reconfigure",
data_schema=get_on_supervisor_schema(
{CONF_USE_ADDON: config_entry.data.get(CONF_USE_ADDON, True)}
),
)
if not user_input[CONF_USE_ADDON]:
if config_entry.data.get(CONF_USE_ADDON):
# Unload the config entry before stopping the add-on.
await self.hass.config_entries.async_unload(config_entry.entry_id)
addon_manager = get_addon_manager(self.hass)
_LOGGER.debug("Stopping Z-Wave JS add-on")
try:
await addon_manager.async_stop_addon()
except AddonError as err:
_LOGGER.error(err)
self.hass.config_entries.async_schedule_reload(
config_entry.entry_id
)
raise AbortFlow("addon_stop_failed") from err
return await self.async_step_manual_reconfigure()
addon_info = await self._async_get_addon_info()
if addon_info.state == AddonState.NOT_INSTALLED:
return await self.async_step_install_addon()
return await self.async_step_configure_addon_reconfigure()
async def async_step_configure_addon_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Ask for config for Z-Wave JS add-on."""
addon_info = await self._async_get_addon_info()
addon_config = addon_info.options
if user_input is not None:
self.s0_legacy_key = user_input[CONF_S0_LEGACY_KEY]
self.s2_access_control_key = user_input[CONF_S2_ACCESS_CONTROL_KEY]
self.s2_authenticated_key = user_input[CONF_S2_AUTHENTICATED_KEY]
self.s2_unauthenticated_key = user_input[CONF_S2_UNAUTHENTICATED_KEY]
self.lr_s2_access_control_key = user_input[CONF_LR_S2_ACCESS_CONTROL_KEY]
self.lr_s2_authenticated_key = user_input[CONF_LR_S2_AUTHENTICATED_KEY]
self.usb_path = user_input[CONF_USB_PATH]
addon_config_updates = {
CONF_ADDON_DEVICE: self.usb_path,
CONF_ADDON_S0_LEGACY_KEY: self.s0_legacy_key,
CONF_ADDON_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key,
CONF_ADDON_S2_AUTHENTICATED_KEY: self.s2_authenticated_key,
CONF_ADDON_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key,
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key,
CONF_ADDON_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key,
CONF_ADDON_LOG_LEVEL: user_input[CONF_LOG_LEVEL],
CONF_ADDON_EMULATE_HARDWARE: user_input.get(
CONF_EMULATE_HARDWARE, False
),
}
await self._async_set_addon_config(addon_config_updates)
if addon_info.state == AddonState.RUNNING and not self.restart_addon:
return await self.async_step_finish_addon_setup_reconfigure()
if (
config_entry := self._reconfigure_config_entry
) and config_entry.data.get(CONF_USE_ADDON):
# Disconnect integration before restarting add-on.
await self.hass.config_entries.async_unload(config_entry.entry_id)
return await self.async_step_start_addon()
usb_path = addon_config.get(CONF_ADDON_DEVICE, self.usb_path or "")
s0_legacy_key = addon_config.get(
CONF_ADDON_S0_LEGACY_KEY, self.s0_legacy_key or ""
)
s2_access_control_key = addon_config.get(
CONF_ADDON_S2_ACCESS_CONTROL_KEY, self.s2_access_control_key or ""
)
s2_authenticated_key = addon_config.get(
CONF_ADDON_S2_AUTHENTICATED_KEY, self.s2_authenticated_key or ""
)
s2_unauthenticated_key = addon_config.get(
CONF_ADDON_S2_UNAUTHENTICATED_KEY, self.s2_unauthenticated_key or ""
)
lr_s2_access_control_key = addon_config.get(
CONF_ADDON_LR_S2_ACCESS_CONTROL_KEY, self.lr_s2_access_control_key or ""
)
lr_s2_authenticated_key = addon_config.get(
CONF_ADDON_LR_S2_AUTHENTICATED_KEY, self.lr_s2_authenticated_key or ""
)
log_level = addon_config.get(CONF_ADDON_LOG_LEVEL, "info")
emulate_hardware = addon_config.get(CONF_ADDON_EMULATE_HARDWARE, False)
try:
ports = await async_get_usb_ports(self.hass)
except OSError as err:
_LOGGER.error("Failed to get USB ports: %s", err)
return self.async_abort(reason="usb_ports_failed")
data_schema = vol.Schema(
{
vol.Required(CONF_USB_PATH, default=usb_path): vol.In(ports),
vol.Optional(CONF_S0_LEGACY_KEY, default=s0_legacy_key): str,
vol.Optional(
CONF_S2_ACCESS_CONTROL_KEY, default=s2_access_control_key
): str,
vol.Optional(
CONF_S2_AUTHENTICATED_KEY, default=s2_authenticated_key
): str,
vol.Optional(
CONF_S2_UNAUTHENTICATED_KEY, default=s2_unauthenticated_key
): str,
vol.Optional(
CONF_LR_S2_ACCESS_CONTROL_KEY, default=lr_s2_access_control_key
): str,
vol.Optional(
CONF_LR_S2_AUTHENTICATED_KEY, default=lr_s2_authenticated_key
): str,
vol.Optional(CONF_LOG_LEVEL, default=log_level): vol.In(
ADDON_LOG_LEVELS
),
vol.Optional(CONF_EMULATE_HARDWARE, default=emulate_hardware): bool,
}
)
return self.async_show_form(
step_id="configure_addon_reconfigure", data_schema=data_schema
)
async def async_step_choose_serial_port(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Choose a serial port."""
if user_input is not None:
self.usb_path = user_input[CONF_USB_PATH]
await self._async_set_addon_config({CONF_ADDON_DEVICE: self.usb_path})
return await self.async_step_start_addon()
try:
ports = await async_get_usb_ports(self.hass)
except OSError as err:
_LOGGER.error("Failed to get USB ports: %s", err)
return self.async_abort(reason="usb_ports_failed")
data_schema = vol.Schema(
{
vol.Required(CONF_USB_PATH): vol.In(ports),
}
)
return self.async_show_form(
step_id="choose_serial_port", data_schema=data_schema
)
async def async_step_backup_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Backup failed."""
return self.async_abort(reason="backup_failed")
async def async_step_restore_failed(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Restore failed."""
if user_input is not None:
return await self.async_step_restore_nvm()
return self.async_show_form(
step_id="restore_failed",
description_placeholders={
"file_path": str(self.backup_filepath),
},
)
async def async_step_migration_done(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Migration done."""
return self.async_abort(reason="migration_successful")
async def async_step_finish_addon_setup_migrate(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prepare info needed to complete the config entry update."""
ws_address = self.ws_address
assert ws_address is not None
version_info = self.version_info
assert version_info is not None
# We need to wait for the config entry to be reloaded,
# before restoring the backup.
# We will do this in the restore nvm progress task,
# to get a nicer user experience.
self._async_update_entry(
{
"unique_id": str(version_info.home_id),
CONF_URL: ws_address,
CONF_USB_PATH: self.usb_path,
CONF_S0_LEGACY_KEY: self.s0_legacy_key,
CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key,
CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key,
CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key,
CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key,
CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key,
CONF_USE_ADDON: True,
CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon,
},
schedule_reload=False,
)
return await self.async_step_restore_nvm()
async def async_step_finish_addon_setup_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Prepare info needed to complete the config entry update.
Get add-on discovery info and server version info.
Check for same unique id and abort if not the same unique id.
"""
config_entry = self._reconfigure_config_entry
assert config_entry is not None
if self.revert_reason:
self.original_addon_config = None
reason = self.revert_reason
self.revert_reason = None
return await self.async_revert_addon_config(reason=reason)
if not self.ws_address:
discovery_info = await self._async_get_addon_discovery_info()
self.ws_address = f"ws://{discovery_info['host']}:{discovery_info['port']}"
if not self.version_info:
try:
self.version_info = await async_get_version_info(
self.hass, self.ws_address
)
except CannotConnect:
return await self.async_revert_addon_config(reason="cannot_connect")
if config_entry.unique_id != str(self.version_info.home_id):
return await self.async_revert_addon_config(reason="different_device")
self._async_update_entry(
{
CONF_URL: self.ws_address,
CONF_USB_PATH: self.usb_path,
CONF_S0_LEGACY_KEY: self.s0_legacy_key,
CONF_S2_ACCESS_CONTROL_KEY: self.s2_access_control_key,
CONF_S2_AUTHENTICATED_KEY: self.s2_authenticated_key,
CONF_S2_UNAUTHENTICATED_KEY: self.s2_unauthenticated_key,
CONF_LR_S2_ACCESS_CONTROL_KEY: self.lr_s2_access_control_key,
CONF_LR_S2_AUTHENTICATED_KEY: self.lr_s2_authenticated_key,
CONF_USE_ADDON: True,
CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon,
}
)
return self.async_abort(reason="reconfigure_successful")
async def async_revert_addon_config(self, reason: str) -> ConfigFlowResult:
"""Abort the options flow.
If the add-on options have been changed, revert those and restart add-on.
"""
# If reverting the add-on options failed, abort immediately.
if self.revert_reason:
_LOGGER.error(
"Failed to revert add-on options before aborting flow, reason: %s",
reason,
)
if self.revert_reason or not self.original_addon_config:
config_entry = self._reconfigure_config_entry
assert config_entry is not None
self.hass.config_entries.async_schedule_reload(config_entry.entry_id)
return self.async_abort(reason=reason)
self.revert_reason = reason
addon_config_input = {
ADDON_USER_INPUT_MAP[addon_key]: addon_val
for addon_key, addon_val in self.original_addon_config.items()
if addon_key in ADDON_USER_INPUT_MAP
}
_LOGGER.debug("Reverting add-on options, reason: %s", reason)
return await self.async_step_configure_addon_reconfigure(addon_config_input)
async def _async_backup_network(self) -> None:
"""Backup the current network."""
@callback
def forward_progress(event: dict) -> None:
"""Forward progress events to frontend."""
self.async_update_progress(event["bytesRead"] / event["total"])
controller = self._get_driver().controller
unsub = controller.on("nvm backup progress", forward_progress)
try:
self.backup_data = await controller.async_backup_nvm_raw()
except FailedCommand as err:
raise AbortFlow(f"Failed to backup network: {err}") from err
finally:
unsub()
# save the backup to a file just in case
self.backup_filepath = self.hass.config.path(
f"zwavejs_nvm_backup_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.bin"
)
try:
await self.hass.async_add_executor_job(
Path(self.backup_filepath).write_bytes,
self.backup_data,
)
except OSError as err:
raise AbortFlow(f"Failed to save backup file: {err}") from err
async def _async_restore_network_backup(self) -> None:
"""Restore the backup."""
assert self.backup_data is not None
config_entry = self._reconfigure_config_entry
assert config_entry is not None
# Reload the config entry to reconnect the client after the addon restart
await self.hass.config_entries.async_reload(config_entry.entry_id)
@callback
def forward_progress(event: dict) -> None:
"""Forward progress events to frontend."""
if event["event"] == "nvm convert progress":
# assume convert is 50% of the total progress
self.async_update_progress(event["bytesRead"] / event["total"] * 0.5)
elif event["event"] == "nvm restore progress":
# assume restore is the rest of the progress
self.async_update_progress(
event["bytesWritten"] / event["total"] * 0.5 + 0.5
)
@callback
def set_driver_ready(event: dict) -> None:
"Set the driver ready event."
wait_driver_ready.set()
driver = self._get_driver()
controller = driver.controller
wait_driver_ready = asyncio.Event()
unsubs = [
controller.on("nvm convert progress", forward_progress),
controller.on("nvm restore progress", forward_progress),
driver.once("driver ready", set_driver_ready),
]
try:
await controller.async_restore_nvm(self.backup_data)
except FailedCommand as err:
raise AbortFlow(f"Failed to restore network: {err}") from err
else:
with suppress(TimeoutError):
async with asyncio.timeout(RESTORE_NVM_DRIVER_READY_TIMEOUT):
await wait_driver_ready.wait()
await self.hass.config_entries.async_reload(config_entry.entry_id)
finally:
for unsub in unsubs:
unsub()
def _get_driver(self) -> Driver:
"""Get the driver from the config entry."""
config_entry = self._reconfigure_config_entry
assert config_entry is not None
if config_entry.state != ConfigEntryState.LOADED:
raise AbortFlow("Configuration entry is not loaded")
client: Client = config_entry.runtime_data[DATA_CLIENT]
assert client.driver is not None
return client.driver
class CannotConnect(HomeAssistantError):
"""Indicate connection error."""
class InvalidInput(HomeAssistantError):
"""Error to indicate input data is invalid."""
def __init__(self, error: str) -> None:
"""Initialize error."""
super().__init__()
self.error = error