mirror of
https://github.com/home-assistant/core.git
synced 2025-04-19 14:57:52 +00:00
New ZWave-JS migration flow (#142717)
* ZwaveJS radio migration flow * Partial migration flow * basic migration flow * report exact progress to frontend * Display backup file path * string tweak * update tests * improve exception handling * radio -> controller * test tweak * test tweak * clean up and test error handling * more tests * test progress * PR comments * fix tests * test restore progress * more coverage * coverage * coverage * make mypy happy * PR comments * Apply suggestions from code review Co-authored-by: Martin Hjelmare <marhje52@gmail.com> * ruff --------- Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
parent
4d959fb91c
commit
cadbb623c7
@ -4,12 +4,17 @@ from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from serial.tools import list_ports
|
||||
import voluptuous as vol
|
||||
from zwave_js_server.client import Client
|
||||
from zwave_js_server.exceptions import FailedCommand
|
||||
from zwave_js_server.model.driver import Driver
|
||||
from zwave_js_server.version import VersionInfo, get_server_version
|
||||
|
||||
from homeassistant.components import usb
|
||||
@ -23,6 +28,7 @@ from homeassistant.config_entries import (
|
||||
SOURCE_USB,
|
||||
ConfigEntry,
|
||||
ConfigEntryBaseFlow,
|
||||
ConfigEntryState,
|
||||
ConfigFlow,
|
||||
ConfigFlowResult,
|
||||
OptionsFlow,
|
||||
@ -60,6 +66,7 @@ from .const import (
|
||||
CONF_S2_UNAUTHENTICATED_KEY,
|
||||
CONF_USB_PATH,
|
||||
CONF_USE_ADDON,
|
||||
DATA_CLIENT,
|
||||
DOMAIN,
|
||||
)
|
||||
|
||||
@ -74,6 +81,9 @@ CONF_EMULATE_HARDWARE = "emulate_hardware"
|
||||
CONF_LOG_LEVEL = "log_level"
|
||||
SERVER_VERSION_TIMEOUT = 10
|
||||
|
||||
OPTIONS_INTENT_MIGRATE = "intent_migrate"
|
||||
OPTIONS_INTENT_RECONFIGURE = "intent_reconfigure"
|
||||
|
||||
ADDON_LOG_LEVELS = {
|
||||
"error": "Error",
|
||||
"warn": "Warn",
|
||||
@ -636,7 +646,12 @@ class ZWaveJSConfigFlow(BaseZwaveJSFlow, ConfigFlow, domain=DOMAIN):
|
||||
}
|
||||
|
||||
if not self._usb_discovery:
|
||||
ports = await async_get_usb_ports(self.hass)
|
||||
try:
|
||||
ports = await async_get_usb_ports(self.hass)
|
||||
except OSError as err:
|
||||
_LOGGER.error("Failed to get USB ports: %s", err)
|
||||
return self.async_abort(reason="usb_ports_failed")
|
||||
|
||||
schema = {
|
||||
vol.Required(CONF_USB_PATH, default=usb_path): vol.In(ports),
|
||||
**schema,
|
||||
@ -717,6 +732,10 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
super().__init__()
|
||||
self.original_addon_config: dict[str, Any] | None = None
|
||||
self.revert_reason: str | None = None
|
||||
self.backup_task: asyncio.Task | None = None
|
||||
self.restore_backup_task: asyncio.Task | None = None
|
||||
self.backup_data: bytes | None = None
|
||||
self.backup_filepath: str | None = None
|
||||
|
||||
@callback
|
||||
def _async_update_entry(self, data: dict[str, Any]) -> None:
|
||||
@ -725,6 +744,18 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
|
||||
async def async_step_init(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Confirm if we are migrating adapters or just re-configuring."""
|
||||
return self.async_show_menu(
|
||||
step_id="init",
|
||||
menu_options=[
|
||||
OPTIONS_INTENT_RECONFIGURE,
|
||||
OPTIONS_INTENT_MIGRATE,
|
||||
],
|
||||
)
|
||||
|
||||
async def async_step_intent_reconfigure(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Manage the options."""
|
||||
if is_hassio(self.hass):
|
||||
@ -732,6 +763,91 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
|
||||
return await self.async_step_manual()
|
||||
|
||||
async def async_step_intent_migrate(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Confirm the user wants to reset their current controller."""
|
||||
if not self.config_entry.data.get(CONF_USE_ADDON):
|
||||
return self.async_abort(reason="addon_required")
|
||||
|
||||
if user_input is not None:
|
||||
return await self.async_step_backup_nvm()
|
||||
|
||||
return self.async_show_form(step_id="intent_migrate")
|
||||
|
||||
async def async_step_backup_nvm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Backup the current network."""
|
||||
if self.backup_task is None:
|
||||
self.backup_task = self.hass.async_create_task(self._async_backup_network())
|
||||
|
||||
if not self.backup_task.done():
|
||||
return self.async_show_progress(
|
||||
step_id="backup_nvm",
|
||||
progress_action="backup_nvm",
|
||||
progress_task=self.backup_task,
|
||||
)
|
||||
|
||||
try:
|
||||
await self.backup_task
|
||||
except AbortFlow as err:
|
||||
_LOGGER.error(err)
|
||||
return self.async_show_progress_done(next_step_id="backup_failed")
|
||||
finally:
|
||||
self.backup_task = None
|
||||
|
||||
return self.async_show_progress_done(next_step_id="instruct_unplug")
|
||||
|
||||
async def async_step_restore_nvm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Restore the backup."""
|
||||
if self.restore_backup_task is None:
|
||||
self.restore_backup_task = self.hass.async_create_task(
|
||||
self._async_restore_network_backup()
|
||||
)
|
||||
|
||||
if not self.restore_backup_task.done():
|
||||
return self.async_show_progress(
|
||||
step_id="restore_nvm",
|
||||
progress_action="restore_nvm",
|
||||
progress_task=self.restore_backup_task,
|
||||
)
|
||||
|
||||
try:
|
||||
await self.restore_backup_task
|
||||
except AbortFlow as err:
|
||||
_LOGGER.error(err)
|
||||
return self.async_show_progress_done(next_step_id="restore_failed")
|
||||
finally:
|
||||
self.restore_backup_task = None
|
||||
|
||||
return self.async_show_progress_done(next_step_id="migration_done")
|
||||
|
||||
async def async_step_instruct_unplug(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Reset the current controller, and instruct the user to unplug it."""
|
||||
|
||||
if user_input is not None:
|
||||
# Now that the old controller is gone, we can scan for serial ports again
|
||||
return await self.async_step_choose_serial_port()
|
||||
|
||||
# reset the old controller
|
||||
try:
|
||||
await self._get_driver().async_hard_reset()
|
||||
except FailedCommand as err:
|
||||
_LOGGER.error("Failed to reset controller: %s", err)
|
||||
return self.async_abort(reason="reset_failed")
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="instruct_unplug",
|
||||
description_placeholders={
|
||||
"file_path": str(self.backup_filepath),
|
||||
},
|
||||
)
|
||||
|
||||
async def async_step_manual(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
@ -881,7 +997,11 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
log_level = addon_config.get(CONF_ADDON_LOG_LEVEL, "info")
|
||||
emulate_hardware = addon_config.get(CONF_ADDON_EMULATE_HARDWARE, False)
|
||||
|
||||
ports = await async_get_usb_ports(self.hass)
|
||||
try:
|
||||
ports = await async_get_usb_ports(self.hass)
|
||||
except OSError as err:
|
||||
_LOGGER.error("Failed to get USB ports: %s", err)
|
||||
return self.async_abort(reason="usb_ports_failed")
|
||||
|
||||
data_schema = vol.Schema(
|
||||
{
|
||||
@ -911,12 +1031,64 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
|
||||
return self.async_show_form(step_id="configure_addon", data_schema=data_schema)
|
||||
|
||||
async def async_step_choose_serial_port(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Choose a serial port."""
|
||||
if user_input is not None:
|
||||
addon_info = await self._async_get_addon_info()
|
||||
addon_config = addon_info.options
|
||||
self.usb_path = user_input[CONF_USB_PATH]
|
||||
new_addon_config = {
|
||||
**addon_config,
|
||||
CONF_ADDON_DEVICE: self.usb_path,
|
||||
}
|
||||
if addon_info.state == AddonState.RUNNING:
|
||||
self.restart_addon = True
|
||||
# Copy the add-on config to keep the objects separate.
|
||||
self.original_addon_config = dict(addon_config)
|
||||
await self._async_set_addon_config(new_addon_config)
|
||||
return await self.async_step_start_addon()
|
||||
|
||||
try:
|
||||
ports = await async_get_usb_ports(self.hass)
|
||||
except OSError as err:
|
||||
_LOGGER.error("Failed to get USB ports: %s", err)
|
||||
return self.async_abort(reason="usb_ports_failed")
|
||||
|
||||
data_schema = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_USB_PATH): vol.In(ports),
|
||||
}
|
||||
)
|
||||
return self.async_show_form(
|
||||
step_id="choose_serial_port", data_schema=data_schema
|
||||
)
|
||||
|
||||
async def async_step_start_failed(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Add-on start failed."""
|
||||
return await self.async_revert_addon_config(reason="addon_start_failed")
|
||||
|
||||
async def async_step_backup_failed(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Backup failed."""
|
||||
return self.async_abort(reason="backup_failed")
|
||||
|
||||
async def async_step_restore_failed(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Restore failed."""
|
||||
return self.async_abort(reason="restore_failed")
|
||||
|
||||
async def async_step_migration_done(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Migration done."""
|
||||
return self.async_create_entry(title=TITLE, data={})
|
||||
|
||||
async def async_step_finish_addon_setup(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
@ -943,12 +1115,16 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
except CannotConnect:
|
||||
return await self.async_revert_addon_config(reason="cannot_connect")
|
||||
|
||||
if self.config_entry.unique_id != str(self.version_info.home_id):
|
||||
if self.backup_data is None and self.config_entry.unique_id != str(
|
||||
self.version_info.home_id
|
||||
):
|
||||
return await self.async_revert_addon_config(reason="different_device")
|
||||
|
||||
self._async_update_entry(
|
||||
{
|
||||
**self.config_entry.data,
|
||||
# this will only be different in a migration flow
|
||||
"unique_id": str(self.version_info.home_id),
|
||||
CONF_URL: self.ws_address,
|
||||
CONF_USB_PATH: self.usb_path,
|
||||
CONF_S0_LEGACY_KEY: self.s0_legacy_key,
|
||||
@ -961,6 +1137,9 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
CONF_INTEGRATION_CREATED_ADDON: self.integration_created_addon,
|
||||
}
|
||||
)
|
||||
if self.backup_data:
|
||||
return await self.async_step_restore_nvm()
|
||||
|
||||
# Always reload entry since we may have disconnected the client.
|
||||
self.hass.config_entries.async_schedule_reload(self.config_entry.entry_id)
|
||||
return self.async_create_entry(title=TITLE, data={})
|
||||
@ -990,6 +1169,74 @@ class OptionsFlowHandler(BaseZwaveJSFlow, OptionsFlow):
|
||||
_LOGGER.debug("Reverting add-on options, reason: %s", reason)
|
||||
return await self.async_step_configure_addon(addon_config_input)
|
||||
|
||||
async def _async_backup_network(self) -> None:
|
||||
"""Backup the current network."""
|
||||
|
||||
@callback
|
||||
def forward_progress(event: dict) -> None:
|
||||
"""Forward progress events to frontend."""
|
||||
self.async_update_progress(event["bytesRead"] / event["total"])
|
||||
|
||||
controller = self._get_driver().controller
|
||||
unsub = controller.on("nvm backup progress", forward_progress)
|
||||
try:
|
||||
self.backup_data = await controller.async_backup_nvm_raw()
|
||||
except FailedCommand as err:
|
||||
raise AbortFlow(f"Failed to backup network: {err}") from err
|
||||
finally:
|
||||
unsub()
|
||||
|
||||
# save the backup to a file just in case
|
||||
self.backup_filepath = self.hass.config.path(
|
||||
f"zwavejs_nvm_backup_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.bin"
|
||||
)
|
||||
try:
|
||||
await self.hass.async_add_executor_job(
|
||||
Path(self.backup_filepath).write_bytes,
|
||||
self.backup_data,
|
||||
)
|
||||
except OSError as err:
|
||||
raise AbortFlow(f"Failed to save backup file: {err}") from err
|
||||
|
||||
async def _async_restore_network_backup(self) -> None:
|
||||
"""Restore the backup."""
|
||||
assert self.backup_data is not None
|
||||
|
||||
# Reload the config entry to reconnect the client after the addon restart
|
||||
await self.hass.config_entries.async_reload(self.config_entry.entry_id)
|
||||
|
||||
@callback
|
||||
def forward_progress(event: dict) -> None:
|
||||
"""Forward progress events to frontend."""
|
||||
if event["event"] == "nvm convert progress":
|
||||
# assume convert is 50% of the total progress
|
||||
self.async_update_progress(event["bytesRead"] / event["total"] * 0.5)
|
||||
elif event["event"] == "nvm restore progress":
|
||||
# assume restore is the rest of the progress
|
||||
self.async_update_progress(
|
||||
event["bytesWritten"] / event["total"] * 0.5 + 0.5
|
||||
)
|
||||
|
||||
controller = self._get_driver().controller
|
||||
unsubs = [
|
||||
controller.on("nvm convert progress", forward_progress),
|
||||
controller.on("nvm restore progress", forward_progress),
|
||||
]
|
||||
try:
|
||||
await controller.async_restore_nvm(self.backup_data)
|
||||
except FailedCommand as err:
|
||||
raise AbortFlow(f"Failed to restore network: {err}") from err
|
||||
finally:
|
||||
for unsub in unsubs:
|
||||
unsub()
|
||||
|
||||
def _get_driver(self) -> Driver:
|
||||
if self.config_entry.state != ConfigEntryState.LOADED:
|
||||
raise AbortFlow("Configuration entry is not loaded")
|
||||
client: Client = self.config_entry.runtime_data[DATA_CLIENT]
|
||||
assert client.driver is not None
|
||||
return client.driver
|
||||
|
||||
|
||||
class CannotConnect(HomeAssistantError):
|
||||
"""Indicate connection error."""
|
||||
|
@ -11,7 +11,11 @@
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
"discovery_requires_supervisor": "Discovery requires the supervisor.",
|
||||
"not_zwave_device": "Discovered device is not a Z-Wave device.",
|
||||
"not_zwave_js_addon": "Discovered add-on is not the official Z-Wave add-on."
|
||||
"not_zwave_js_addon": "Discovered add-on is not the official Z-Wave add-on.",
|
||||
"backup_failed": "Failed to backup network.",
|
||||
"restore_failed": "Failed to restore network.",
|
||||
"reset_failed": "Failed to reset controller.",
|
||||
"usb_ports_failed": "Failed to get USB devices."
|
||||
},
|
||||
"error": {
|
||||
"addon_start_failed": "Failed to start the Z-Wave add-on. Check the configuration.",
|
||||
@ -22,7 +26,9 @@
|
||||
"flow_title": "{name}",
|
||||
"progress": {
|
||||
"install_addon": "Please wait while the Z-Wave add-on installation finishes. This can take several minutes.",
|
||||
"start_addon": "Please wait while the Z-Wave add-on start completes. This may take some seconds."
|
||||
"start_addon": "Please wait while the Z-Wave add-on start completes. This may take some seconds.",
|
||||
"backup_nvm": "Please wait while the network backup completes.",
|
||||
"restore_nvm": "Please wait while the network restore completes."
|
||||
},
|
||||
"step": {
|
||||
"configure_addon": {
|
||||
@ -217,7 +223,12 @@
|
||||
"addon_stop_failed": "Failed to stop the Z-Wave add-on.",
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
"different_device": "The connected USB device is not the same as previously configured for this config entry. Please instead create a new config entry for the new device."
|
||||
"different_device": "The connected USB device is not the same as previously configured for this config entry. Please instead create a new config entry for the new device.",
|
||||
"addon_required": "The Z-Wave migration flow requires the integration to be configured using the Z-Wave Supervisor add-on. You can still use the Backup and Restore buttons to migrate your network manually.",
|
||||
"backup_failed": "[%key:component::zwave_js::config::abort::backup_failed%]",
|
||||
"restore_failed": "[%key:component::zwave_js::config::abort::restore_failed%]",
|
||||
"reset_failed": "[%key:component::zwave_js::config::abort::reset_failed%]",
|
||||
"usb_ports_failed": "[%key:component::zwave_js::config::abort::usb_ports_failed%]"
|
||||
},
|
||||
"error": {
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
@ -226,9 +237,27 @@
|
||||
},
|
||||
"progress": {
|
||||
"install_addon": "[%key:component::zwave_js::config::progress::install_addon%]",
|
||||
"start_addon": "[%key:component::zwave_js::config::progress::start_addon%]"
|
||||
"start_addon": "[%key:component::zwave_js::config::progress::start_addon%]",
|
||||
"backup_nvm": "[%key:component::zwave_js::config::progress::backup_nvm%]",
|
||||
"restore_nvm": "[%key:component::zwave_js::config::progress::restore_nvm%]"
|
||||
},
|
||||
"step": {
|
||||
"init": {
|
||||
"title": "Migrate or re-configure",
|
||||
"description": "Are you migrating to a new controller or re-configuring the current controller?",
|
||||
"menu_options": {
|
||||
"intent_migrate": "Migrate to a new controller",
|
||||
"intent_reconfigure": "Re-configure the current controller"
|
||||
}
|
||||
},
|
||||
"intent_migrate": {
|
||||
"title": "[%key:component::zwave_js::options::step::init::menu_options::intent_migrate%]",
|
||||
"description": "Before setting up your new controller, your old controller needs to be reset. A backup will be performed first.\n\nDo you wish to continue?"
|
||||
},
|
||||
"instruct_unplug": {
|
||||
"title": "Unplug your old controller",
|
||||
"description": "Backup saved to \"{file_path}\"\n\nYour old controller has been reset. If the hardware is no longer needed, you can now unplug it.\n\nPlease make sure your new controller is plugged in before continuing."
|
||||
},
|
||||
"configure_addon": {
|
||||
"data": {
|
||||
"emulate_hardware": "Emulate Hardware",
|
||||
@ -242,6 +271,12 @@
|
||||
"description": "[%key:component::zwave_js::config::step::configure_addon::description%]",
|
||||
"title": "[%key:component::zwave_js::config::step::configure_addon::title%]"
|
||||
},
|
||||
"choose_serial_port": {
|
||||
"data": {
|
||||
"usb_path": "[%key:common::config_flow::data::usb_path%]"
|
||||
},
|
||||
"title": "Select your Z-Wave device"
|
||||
},
|
||||
"install_addon": {
|
||||
"title": "[%key:component::zwave_js::config::step::install_addon::title%]"
|
||||
},
|
||||
|
@ -13,18 +13,23 @@ from aiohasupervisor.models import AddonsOptions, Discovery
|
||||
import aiohttp
|
||||
import pytest
|
||||
from serial.tools.list_ports_common import ListPortInfo
|
||||
from zwave_js_server.exceptions import FailedCommand
|
||||
from zwave_js_server.version import VersionInfo
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.zwave_js.config_flow import SERVER_VERSION_TIMEOUT, TITLE
|
||||
from homeassistant.components.zwave_js.const import ADDON_SLUG, DOMAIN
|
||||
from homeassistant import config_entries, data_entry_flow
|
||||
from homeassistant.components.zwave_js.config_flow import (
|
||||
SERVER_VERSION_TIMEOUT,
|
||||
TITLE,
|
||||
OptionsFlowHandler,
|
||||
)
|
||||
from homeassistant.components.zwave_js.const import ADDON_SLUG, CONF_USB_PATH, DOMAIN
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResultType
|
||||
from homeassistant.helpers.service_info.hassio import HassioServiceInfo
|
||||
from homeassistant.helpers.service_info.usb import UsbServiceInfo
|
||||
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.common import MockConfigEntry, async_capture_events
|
||||
|
||||
ADDON_DISCOVERY_INFO = {
|
||||
"addon": "Z-Wave JS",
|
||||
@ -229,18 +234,48 @@ async def slow_server_version(*args):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("flow", "flow_params"),
|
||||
("url", "server_version_side_effect", "server_version_timeout", "error"),
|
||||
[
|
||||
(
|
||||
"flow",
|
||||
lambda entry: {
|
||||
"handler": DOMAIN,
|
||||
"context": {"source": config_entries.SOURCE_USER},
|
||||
},
|
||||
"not-ws-url",
|
||||
None,
|
||||
SERVER_VERSION_TIMEOUT,
|
||||
"invalid_ws_url",
|
||||
),
|
||||
(
|
||||
"ws://localhost:3000",
|
||||
slow_server_version,
|
||||
0,
|
||||
"cannot_connect",
|
||||
),
|
||||
(
|
||||
"ws://localhost:3000",
|
||||
Exception("Boom"),
|
||||
SERVER_VERSION_TIMEOUT,
|
||||
"unknown",
|
||||
),
|
||||
("options", lambda entry: {"handler": entry.entry_id}),
|
||||
],
|
||||
)
|
||||
async def test_manual_errors(hass: HomeAssistant, integration, url, error) -> None:
|
||||
"""Test all errors with a manual set up."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "manual"
|
||||
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"url": url,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["step_id"] == "manual"
|
||||
assert result["errors"] == {"base": error}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("url", "server_version_side_effect", "server_version_timeout", "error"),
|
||||
[
|
||||
@ -264,24 +299,28 @@ async def slow_server_version(*args):
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_manual_errors(
|
||||
hass: HomeAssistant, integration, url, error, flow, flow_params
|
||||
async def test_manual_errors_options_flow(
|
||||
hass: HomeAssistant, integration, url, error
|
||||
) -> None:
|
||||
"""Test all errors with a manual set up."""
|
||||
entry = integration
|
||||
result = await getattr(hass.config_entries, flow).async_init(**flow_params(entry))
|
||||
result = await hass.config_entries.options.async_init(integration.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "manual"
|
||||
|
||||
result = await getattr(hass.config_entries, flow).async_configure(
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"],
|
||||
{
|
||||
"url": url,
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "manual"
|
||||
assert result["errors"] == {"base": error}
|
||||
|
||||
@ -1717,6 +1756,32 @@ async def test_addon_installed_set_options_failure(
|
||||
assert start_addon.call_count == 0
|
||||
|
||||
|
||||
async def test_addon_installed_usb_ports_failure(
|
||||
hass: HomeAssistant,
|
||||
supervisor,
|
||||
addon_installed,
|
||||
) -> None:
|
||||
"""Test usb ports failure when add-on is installed."""
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN, context={"source": config_entries.SOURCE_USER}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.zwave_js.config_flow.async_get_usb_ports",
|
||||
side_effect=OSError("test_error"),
|
||||
):
|
||||
result = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"], {"use_addon": True}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "usb_ports_failed"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"discovery_info",
|
||||
[
|
||||
@ -1972,6 +2037,13 @@ async def test_options_manual(hass: HomeAssistant, client, integration) -> None:
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "manual"
|
||||
|
||||
@ -1997,6 +2069,13 @@ async def test_options_manual_different_device(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "manual"
|
||||
|
||||
@ -2021,6 +2100,13 @@ async def test_options_not_addon(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2069,6 +2155,13 @@ async def test_options_not_addon_with_addon(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2129,6 +2222,13 @@ async def test_options_not_addon_with_addon_stop_fail(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2259,6 +2359,13 @@ async def test_options_addon_running(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2386,6 +2493,13 @@ async def test_options_addon_running_no_changes(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2559,6 +2673,13 @@ async def test_options_different_device(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2735,6 +2856,13 @@ async def test_options_addon_restart_failed(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2869,6 +2997,13 @@ async def test_options_addon_running_server_info_failure(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -2999,6 +3134,13 @@ async def test_options_addon_not_installed(
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
@ -3100,3 +3242,472 @@ async def test_zeroconf(hass: HomeAssistant) -> None:
|
||||
}
|
||||
assert len(mock_setup.mock_calls) == 1
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_options_migrate_no_addon(hass: HomeAssistant, integration) -> None:
|
||||
"""Test migration flow fails when not using add-on."""
|
||||
entry = integration
|
||||
hass.config_entries.async_update_entry(
|
||||
entry, unique_id="1234", data={**entry.data, "use_addon": False}
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "addon_required"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"discovery_info",
|
||||
[
|
||||
[
|
||||
Discovery(
|
||||
addon="core_zwave_js",
|
||||
service="zwave_js",
|
||||
uuid=uuid4(),
|
||||
config=ADDON_DISCOVERY_INFO,
|
||||
)
|
||||
]
|
||||
],
|
||||
)
|
||||
async def test_options_migrate_with_addon(
|
||||
hass: HomeAssistant,
|
||||
client,
|
||||
supervisor,
|
||||
integration,
|
||||
addon_running,
|
||||
restart_addon,
|
||||
set_addon_options,
|
||||
get_addon_discovery_info,
|
||||
) -> None:
|
||||
"""Test migration flow with add-on."""
|
||||
hass.config_entries.async_update_entry(
|
||||
integration,
|
||||
unique_id="1234",
|
||||
data={
|
||||
"url": "ws://localhost:3000",
|
||||
"use_addon": True,
|
||||
"usb_path": "/dev/ttyUSB0",
|
||||
},
|
||||
)
|
||||
|
||||
async def mock_backup_nvm_raw():
|
||||
await asyncio.sleep(0)
|
||||
client.driver.controller.emit(
|
||||
"nvm backup progress", {"bytesRead": 100, "total": 200}
|
||||
)
|
||||
return b"test_nvm_data"
|
||||
|
||||
client.driver.controller.async_backup_nvm_raw = AsyncMock(
|
||||
side_effect=mock_backup_nvm_raw
|
||||
)
|
||||
|
||||
async def mock_restore_nvm(data: bytes):
|
||||
client.driver.controller.emit(
|
||||
"nvm convert progress",
|
||||
{"event": "nvm convert progress", "bytesRead": 100, "total": 200},
|
||||
)
|
||||
await asyncio.sleep(0)
|
||||
client.driver.controller.emit(
|
||||
"nvm restore progress",
|
||||
{"event": "nvm restore progress", "bytesWritten": 100, "total": 200},
|
||||
)
|
||||
|
||||
client.driver.controller.async_restore_nvm = AsyncMock(side_effect=mock_restore_nvm)
|
||||
|
||||
hass.config_entries.async_reload = AsyncMock()
|
||||
|
||||
events = async_capture_events(
|
||||
hass, data_entry_flow.EVENT_DATA_ENTRY_FLOW_PROGRESS_UPDATE
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(integration.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "intent_migrate"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "backup_nvm"
|
||||
|
||||
with patch("pathlib.Path.write_bytes", MagicMock()) as mock_file:
|
||||
await hass.async_block_till_done()
|
||||
assert client.driver.controller.async_backup_nvm_raw.call_count == 1
|
||||
assert mock_file.call_count == 1
|
||||
assert len(events) == 1
|
||||
assert events[0].data["progress"] == 0.5
|
||||
events.clear()
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "instruct_unplug"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "choose_serial_port"
|
||||
assert result["data_schema"].schema[CONF_USB_PATH]
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"],
|
||||
user_input={
|
||||
CONF_USB_PATH: "/test",
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "start_addon"
|
||||
assert set_addon_options.call_args == call(
|
||||
"core_zwave_js", AddonsOptions(config={"device": "/test"})
|
||||
)
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert restart_addon.call_args == call("core_zwave_js")
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "restore_nvm"
|
||||
|
||||
await hass.async_block_till_done()
|
||||
assert hass.config_entries.async_reload.called
|
||||
assert client.driver.controller.async_restore_nvm.call_count == 1
|
||||
assert len(events) == 2
|
||||
assert events[0].data["progress"] == 0.25
|
||||
assert events[1].data["progress"] == 0.75
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.CREATE_ENTRY
|
||||
assert integration.data["url"] == "ws://host1:3001"
|
||||
assert integration.data["usb_path"] == "/test"
|
||||
assert integration.data["use_addon"] is True
|
||||
|
||||
|
||||
async def test_options_migrate_backup_failure(
|
||||
hass: HomeAssistant, integration, client
|
||||
) -> None:
|
||||
"""Test backup failure."""
|
||||
entry = integration
|
||||
hass.config_entries.async_update_entry(
|
||||
entry, unique_id="1234", data={**entry.data, "use_addon": True}
|
||||
)
|
||||
|
||||
client.driver.controller.async_backup_nvm_raw = AsyncMock(
|
||||
side_effect=FailedCommand("test_error", "unknown_error")
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "intent_migrate"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "backup_failed"
|
||||
|
||||
|
||||
async def test_options_migrate_backup_file_failure(
|
||||
hass: HomeAssistant, integration, client
|
||||
) -> None:
|
||||
"""Test backup file failure."""
|
||||
entry = integration
|
||||
hass.config_entries.async_update_entry(
|
||||
entry, unique_id="1234", data={**entry.data, "use_addon": True}
|
||||
)
|
||||
|
||||
async def mock_backup_nvm_raw():
|
||||
await asyncio.sleep(0)
|
||||
return b"test_nvm_data"
|
||||
|
||||
client.driver.controller.async_backup_nvm_raw = AsyncMock(
|
||||
side_effect=mock_backup_nvm_raw
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(entry.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "intent_migrate"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "backup_nvm"
|
||||
|
||||
with patch(
|
||||
"pathlib.Path.write_bytes", MagicMock(side_effect=OSError("test_error"))
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
assert client.driver.controller.async_backup_nvm_raw.call_count == 1
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "backup_failed"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"discovery_info",
|
||||
[
|
||||
[
|
||||
Discovery(
|
||||
addon="core_zwave_js",
|
||||
service="zwave_js",
|
||||
uuid=uuid4(),
|
||||
config=ADDON_DISCOVERY_INFO,
|
||||
)
|
||||
]
|
||||
],
|
||||
)
|
||||
async def test_options_migrate_restore_failure(
|
||||
hass: HomeAssistant,
|
||||
client,
|
||||
supervisor,
|
||||
integration,
|
||||
addon_running,
|
||||
restart_addon,
|
||||
set_addon_options,
|
||||
get_addon_discovery_info,
|
||||
) -> None:
|
||||
"""Test restore failure."""
|
||||
hass.config_entries.async_update_entry(
|
||||
integration, unique_id="1234", data={**integration.data, "use_addon": True}
|
||||
)
|
||||
|
||||
async def mock_backup_nvm_raw():
|
||||
await asyncio.sleep(0)
|
||||
return b"test_nvm_data"
|
||||
|
||||
client.driver.controller.async_backup_nvm_raw = AsyncMock(
|
||||
side_effect=mock_backup_nvm_raw
|
||||
)
|
||||
client.driver.controller.async_restore_nvm = AsyncMock(
|
||||
side_effect=FailedCommand("test_error", "unknown_error")
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(integration.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "intent_migrate"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "backup_nvm"
|
||||
|
||||
with patch("pathlib.Path.write_bytes", MagicMock()) as mock_file:
|
||||
await hass.async_block_till_done()
|
||||
assert client.driver.controller.async_backup_nvm_raw.call_count == 1
|
||||
assert mock_file.call_count == 1
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "instruct_unplug"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "choose_serial_port"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"],
|
||||
user_input={
|
||||
CONF_USB_PATH: "/test",
|
||||
},
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "start_addon"
|
||||
|
||||
await hass.async_block_till_done()
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "restore_nvm"
|
||||
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert client.driver.controller.async_restore_nvm.call_count == 1
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "restore_failed"
|
||||
|
||||
|
||||
async def test_get_driver_failure(hass: HomeAssistant, integration, client) -> None:
|
||||
"""Test get driver failure."""
|
||||
|
||||
handler = OptionsFlowHandler()
|
||||
handler.hass = hass
|
||||
handler._config_entry = integration
|
||||
await hass.config_entries.async_unload(integration.entry_id)
|
||||
|
||||
with pytest.raises(data_entry_flow.AbortFlow):
|
||||
await handler._get_driver()
|
||||
|
||||
|
||||
async def test_hard_reset_failure(hass: HomeAssistant, integration, client) -> None:
|
||||
"""Test hard reset failure."""
|
||||
hass.config_entries.async_update_entry(
|
||||
integration, unique_id="1234", data={**integration.data, "use_addon": True}
|
||||
)
|
||||
|
||||
async def mock_backup_nvm_raw():
|
||||
await asyncio.sleep(0)
|
||||
return b"test_nvm_data"
|
||||
|
||||
client.driver.controller.async_backup_nvm_raw = AsyncMock(
|
||||
side_effect=mock_backup_nvm_raw
|
||||
)
|
||||
client.driver.async_hard_reset = AsyncMock(
|
||||
side_effect=FailedCommand("test_error", "unknown_error")
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(integration.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "intent_migrate"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "backup_nvm"
|
||||
|
||||
with patch("pathlib.Path.write_bytes", MagicMock()) as mock_file:
|
||||
await hass.async_block_till_done()
|
||||
assert client.driver.controller.async_backup_nvm_raw.call_count == 1
|
||||
assert mock_file.call_count == 1
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "reset_failed"
|
||||
|
||||
|
||||
async def test_choose_serial_port_usb_ports_failure(
|
||||
hass: HomeAssistant, integration, client
|
||||
) -> None:
|
||||
"""Test choose serial port usb ports failure."""
|
||||
hass.config_entries.async_update_entry(
|
||||
integration, unique_id="1234", data={**integration.data, "use_addon": True}
|
||||
)
|
||||
|
||||
async def mock_backup_nvm_raw():
|
||||
await asyncio.sleep(0)
|
||||
return b"test_nvm_data"
|
||||
|
||||
client.driver.controller.async_backup_nvm_raw = AsyncMock(
|
||||
side_effect=mock_backup_nvm_raw
|
||||
)
|
||||
|
||||
result = await hass.config_entries.options.async_init(integration.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_migrate"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "intent_migrate"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"], {})
|
||||
|
||||
assert result["type"] == FlowResultType.SHOW_PROGRESS
|
||||
assert result["step_id"] == "backup_nvm"
|
||||
|
||||
with patch("pathlib.Path.write_bytes", MagicMock()) as mock_file:
|
||||
await hass.async_block_till_done()
|
||||
assert client.driver.controller.async_backup_nvm_raw.call_count == 1
|
||||
assert mock_file.call_count == 1
|
||||
|
||||
result = await hass.config_entries.options.async_configure(result["flow_id"])
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "instruct_unplug"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.zwave_js.config_flow.async_get_usb_ports",
|
||||
side_effect=OSError("test_error"),
|
||||
):
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {}
|
||||
)
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "usb_ports_failed"
|
||||
|
||||
|
||||
async def test_configure_addon_usb_ports_failure(
|
||||
hass: HomeAssistant, integration, addon_installed, supervisor
|
||||
) -> None:
|
||||
"""Test configure addon usb ports failure."""
|
||||
result = await hass.config_entries.options.async_init(integration.entry_id)
|
||||
|
||||
assert result["type"] == FlowResultType.MENU
|
||||
assert result["step_id"] == "init"
|
||||
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"next_step_id": "intent_reconfigure"}
|
||||
)
|
||||
|
||||
assert result["type"] == FlowResultType.FORM
|
||||
assert result["step_id"] == "on_supervisor"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.zwave_js.config_flow.async_get_usb_ports",
|
||||
side_effect=OSError("test_error"),
|
||||
):
|
||||
result = await hass.config_entries.options.async_configure(
|
||||
result["flow_id"], {"use_addon": True}
|
||||
)
|
||||
assert result["type"] == FlowResultType.ABORT
|
||||
assert result["reason"] == "usb_ports_failed"
|
||||
|
Loading…
x
Reference in New Issue
Block a user