mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 05:07:41 +00:00
Use UsbServiceInfo and ZeroconfServiceInfo in zha (#60266)
Co-authored-by: epenet <epenet@users.noreply.github.com>
This commit is contained in:
parent
d990fe1957
commit
3bf12fcd29
@ -8,9 +8,9 @@ import voluptuous as vol
|
|||||||
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH
|
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH
|
||||||
|
|
||||||
from homeassistant import config_entries
|
from homeassistant import config_entries
|
||||||
from homeassistant.components import usb
|
from homeassistant.components import usb, zeroconf
|
||||||
from homeassistant.const import CONF_HOST, CONF_NAME
|
from homeassistant.const import CONF_NAME
|
||||||
from homeassistant.helpers.typing import DiscoveryInfoType
|
from homeassistant.data_entry_flow import FlowResult
|
||||||
|
|
||||||
from .core.const import (
|
from .core.const import (
|
||||||
CONF_BAUDRATE,
|
CONF_BAUDRATE,
|
||||||
@ -94,14 +94,14 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
data_schema=vol.Schema(schema),
|
data_schema=vol.Schema(schema),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_step_usb(self, discovery_info: DiscoveryInfoType):
|
async def async_step_usb(self, discovery_info: usb.UsbServiceInfo) -> FlowResult:
|
||||||
"""Handle usb discovery."""
|
"""Handle usb discovery."""
|
||||||
vid = discovery_info["vid"]
|
vid = discovery_info.vid
|
||||||
pid = discovery_info["pid"]
|
pid = discovery_info.pid
|
||||||
serial_number = discovery_info["serial_number"]
|
serial_number = discovery_info.serial_number
|
||||||
device = discovery_info["device"]
|
device = discovery_info.device
|
||||||
manufacturer = discovery_info["manufacturer"]
|
manufacturer = discovery_info.manufacturer
|
||||||
description = discovery_info["description"]
|
description = discovery_info.description
|
||||||
dev_path = await self.hass.async_add_executor_job(usb.get_serial_by_id, device)
|
dev_path = await self.hass.async_add_executor_job(usb.get_serial_by_id, device)
|
||||||
unique_id = f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}"
|
unique_id = f"{vid}:{pid}_{serial_number}_{manufacturer}_{description}"
|
||||||
if current_entry := await self.async_set_unique_id(unique_id):
|
if current_entry := await self.async_set_unique_id(unique_id):
|
||||||
@ -159,12 +159,14 @@ class ZhaFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
|||||||
data_schema=vol.Schema({}),
|
data_schema=vol.Schema({}),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_step_zeroconf(self, discovery_info: DiscoveryInfoType):
|
async def async_step_zeroconf(
|
||||||
|
self, discovery_info: zeroconf.ZeroconfServiceInfo
|
||||||
|
) -> FlowResult:
|
||||||
"""Handle zeroconf discovery."""
|
"""Handle zeroconf discovery."""
|
||||||
# Hostname is format: livingroom.local.
|
# Hostname is format: livingroom.local.
|
||||||
local_name = discovery_info["hostname"][:-1]
|
local_name = discovery_info.hostname[:-1]
|
||||||
node_name = local_name[: -len(".local")]
|
node_name = local_name[: -len(".local")]
|
||||||
host = discovery_info[CONF_HOST]
|
host = discovery_info.host
|
||||||
device_path = f"socket://{host}:6638"
|
device_path = f"socket://{host}:6638"
|
||||||
|
|
||||||
if current_entry := await self.async_set_unique_id(node_name):
|
if current_entry := await self.async_set_unique_id(node_name):
|
||||||
|
@ -8,6 +8,7 @@ import zigpy.config
|
|||||||
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH
|
from zigpy.config import CONF_DEVICE, CONF_DEVICE_PATH
|
||||||
|
|
||||||
from homeassistant import config_entries
|
from homeassistant import config_entries
|
||||||
|
from homeassistant.components import usb, zeroconf
|
||||||
from homeassistant.components.ssdp import (
|
from homeassistant.components.ssdp import (
|
||||||
ATTR_SSDP_LOCATION,
|
ATTR_SSDP_LOCATION,
|
||||||
ATTR_UPNP_MANUFACTURER_URL,
|
ATTR_UPNP_MANUFACTURER_URL,
|
||||||
@ -52,12 +53,14 @@ def com_port():
|
|||||||
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=True)
|
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=True)
|
||||||
async def test_discovery(detect_mock, hass):
|
async def test_discovery(detect_mock, hass):
|
||||||
"""Test zeroconf flow -- radio detected."""
|
"""Test zeroconf flow -- radio detected."""
|
||||||
service_info = {
|
service_info = zeroconf.ZeroconfServiceInfo(
|
||||||
"host": "192.168.1.200",
|
host="192.168.1.200",
|
||||||
"port": 6053,
|
hostname="_tube_zb_gw._tcp.local.",
|
||||||
"hostname": "_tube_zb_gw._tcp.local.",
|
name="mock_name",
|
||||||
"properties": {"name": "tube_123456"},
|
port=6053,
|
||||||
}
|
properties={"name": "tube_123456"},
|
||||||
|
type="mock_type",
|
||||||
|
)
|
||||||
flow = await hass.config_entries.flow.async_init(
|
flow = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_ZEROCONF}, data=service_info
|
"zha", context={"source": SOURCE_ZEROCONF}, data=service_info
|
||||||
)
|
)
|
||||||
@ -94,12 +97,14 @@ async def test_discovery_via_zeroconf_ip_change(detect_mock, hass):
|
|||||||
)
|
)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
service_info = {
|
service_info = zeroconf.ZeroconfServiceInfo(
|
||||||
"host": "192.168.1.22",
|
host="192.168.1.22",
|
||||||
"port": 6053,
|
hostname="tube_zb_gw_cc2652p2_poe.local.",
|
||||||
"hostname": "tube_zb_gw_cc2652p2_poe.local.",
|
name="mock_name",
|
||||||
"properties": {"address": "tube_zb_gw_cc2652p2_poe.local"},
|
port=6053,
|
||||||
}
|
properties={"address": "tube_zb_gw_cc2652p2_poe.local"},
|
||||||
|
type="mock_type",
|
||||||
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_ZEROCONF}, data=service_info
|
"zha", context={"source": SOURCE_ZEROCONF}, data=service_info
|
||||||
)
|
)
|
||||||
@ -124,12 +129,14 @@ async def test_discovery_via_zeroconf_ip_change_ignored(detect_mock, hass):
|
|||||||
)
|
)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
service_info = {
|
service_info = zeroconf.ZeroconfServiceInfo(
|
||||||
"host": "192.168.1.22",
|
host="192.168.1.22",
|
||||||
"port": 6053,
|
hostname="tube_zb_gw_cc2652p2_poe.local.",
|
||||||
"hostname": "tube_zb_gw_cc2652p2_poe.local.",
|
name="mock_name",
|
||||||
"properties": {"address": "tube_zb_gw_cc2652p2_poe.local"},
|
port=6053,
|
||||||
}
|
properties={"address": "tube_zb_gw_cc2652p2_poe.local"},
|
||||||
|
type="mock_type",
|
||||||
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_ZEROCONF}, data=service_info
|
"zha", context={"source": SOURCE_ZEROCONF}, data=service_info
|
||||||
)
|
)
|
||||||
@ -144,14 +151,14 @@ async def test_discovery_via_zeroconf_ip_change_ignored(detect_mock, hass):
|
|||||||
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=True)
|
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=True)
|
||||||
async def test_discovery_via_usb(detect_mock, hass):
|
async def test_discovery_via_usb(detect_mock, hass):
|
||||||
"""Test usb flow -- radio detected."""
|
"""Test usb flow -- radio detected."""
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -180,14 +187,14 @@ async def test_discovery_via_usb(detect_mock, hass):
|
|||||||
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=False)
|
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=False)
|
||||||
async def test_discovery_via_usb_no_radio(detect_mock, hass):
|
async def test_discovery_via_usb_no_radio(detect_mock, hass):
|
||||||
"""Test usb flow -- no radio detected."""
|
"""Test usb flow -- no radio detected."""
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/null",
|
device="/dev/null",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -213,14 +220,14 @@ async def test_discovery_via_usb_already_setup(detect_mock, hass):
|
|||||||
domain=DOMAIN, data={CONF_DEVICE: {CONF_DEVICE_PATH: "/dev/ttyUSB1"}}
|
domain=DOMAIN, data={CONF_DEVICE: {CONF_DEVICE_PATH: "/dev/ttyUSB1"}}
|
||||||
).add_to_hass(hass)
|
).add_to_hass(hass)
|
||||||
|
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -247,14 +254,14 @@ async def test_discovery_via_usb_path_changes(hass):
|
|||||||
)
|
)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
|
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -282,14 +289,14 @@ async def test_discovery_via_usb_deconz_already_discovered(detect_mock, hass):
|
|||||||
context={"source": SOURCE_SSDP},
|
context={"source": SOURCE_SSDP},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -304,14 +311,14 @@ async def test_discovery_via_usb_deconz_already_setup(detect_mock, hass):
|
|||||||
"""Test usb flow -- deconz setup."""
|
"""Test usb flow -- deconz setup."""
|
||||||
MockConfigEntry(domain="deconz", data={}).add_to_hass(hass)
|
MockConfigEntry(domain="deconz", data={}).add_to_hass(hass)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -328,14 +335,14 @@ async def test_discovery_via_usb_deconz_ignored(detect_mock, hass):
|
|||||||
domain="deconz", source=config_entries.SOURCE_IGNORE, data={}
|
domain="deconz", source=config_entries.SOURCE_IGNORE, data={}
|
||||||
).add_to_hass(hass)
|
).add_to_hass(hass)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -356,14 +363,14 @@ async def test_discovery_via_usb_zha_ignored_updates(detect_mock, hass):
|
|||||||
)
|
)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
discovery_info = {
|
discovery_info = usb.UsbServiceInfo(
|
||||||
"device": "/dev/ttyZIGBEE",
|
device="/dev/ttyZIGBEE",
|
||||||
"pid": "AAAA",
|
pid="AAAA",
|
||||||
"vid": "AAAA",
|
vid="AAAA",
|
||||||
"serial_number": "1234",
|
serial_number="1234",
|
||||||
"description": "zigbee radio",
|
description="zigbee radio",
|
||||||
"manufacturer": "test",
|
manufacturer="test",
|
||||||
}
|
)
|
||||||
result = await hass.config_entries.flow.async_init(
|
result = await hass.config_entries.flow.async_init(
|
||||||
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
"zha", context={"source": SOURCE_USB}, data=discovery_info
|
||||||
)
|
)
|
||||||
@ -380,12 +387,14 @@ async def test_discovery_via_usb_zha_ignored_updates(detect_mock, hass):
|
|||||||
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=True)
|
@patch("zigpy_znp.zigbee.application.ControllerApplication.probe", return_value=True)
|
||||||
async def test_discovery_already_setup(detect_mock, hass):
|
async def test_discovery_already_setup(detect_mock, hass):
|
||||||
"""Test zeroconf flow -- radio detected."""
|
"""Test zeroconf flow -- radio detected."""
|
||||||
service_info = {
|
service_info = zeroconf.ZeroconfServiceInfo(
|
||||||
"host": "192.168.1.200",
|
host="192.168.1.200",
|
||||||
"port": 6053,
|
hostname="_tube_zb_gw._tcp.local.",
|
||||||
"hostname": "_tube_zb_gw._tcp.local.",
|
name="mock_name",
|
||||||
"properties": {"name": "tube_123456"},
|
port=6053,
|
||||||
}
|
properties={"name": "tube_123456"},
|
||||||
|
type="mock_type",
|
||||||
|
)
|
||||||
|
|
||||||
MockConfigEntry(
|
MockConfigEntry(
|
||||||
domain=DOMAIN, data={CONF_DEVICE: {CONF_DEVICE_PATH: "/dev/ttyUSB1"}}
|
domain=DOMAIN, data={CONF_DEVICE: {CONF_DEVICE_PATH: "/dev/ttyUSB1"}}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user