From a89057ece5e268ca7dd4d2c08296145b46cbf697 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 26 Aug 2021 08:59:02 -0500 Subject: [PATCH] Limit USB discovery to specific manufacturer/description/serial_number matches (#55236) * Limit USB discovery to specific manufacturer/description/serial_number matches * test for None case --- homeassistant/components/usb/__init__.py | 20 ++ homeassistant/components/zha/manifest.json | 7 +- homeassistant/generated/usb.py | 14 +- script/hassfest/manifest.py | 3 + tests/components/usb/test_init.py | 293 ++++++++++++++++++++- 5 files changed, 324 insertions(+), 13 deletions(-) diff --git a/homeassistant/components/usb/__init__.py b/homeassistant/components/usb/__init__.py index 3aaccc15a64..d02c01ad03d 100644 --- a/homeassistant/components/usb/__init__.py +++ b/homeassistant/components/usb/__init__.py @@ -2,6 +2,7 @@ from __future__ import annotations import dataclasses +import fnmatch import logging import os import sys @@ -72,6 +73,13 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: return True +def _fnmatch_lower(name: str | None, pattern: str) -> bool: + """Match a lowercase version of the name.""" + if name is None: + return False + return fnmatch.fnmatch(name.lower(), pattern) + + class USBDiscovery: """Manage USB Discovery.""" @@ -152,6 +160,18 @@ class USBDiscovery: continue if "pid" in matcher and device.pid != matcher["pid"]: continue + if "serial_number" in matcher and not _fnmatch_lower( + device.serial_number, matcher["serial_number"] + ): + continue + if "manufacturer" in matcher and not _fnmatch_lower( + device.manufacturer, matcher["manufacturer"] + ): + continue + if "description" in matcher and not _fnmatch_lower( + device.description, matcher["description"] + ): + continue flow: USBFlow = { "domain": matcher["domain"], "context": {"source": config_entries.SOURCE_USB}, diff --git a/homeassistant/components/zha/manifest.json b/homeassistant/components/zha/manifest.json index 93d9816d339..2c1d625b7fe 100644 --- a/homeassistant/components/zha/manifest.json +++ b/homeassistant/components/zha/manifest.json @@ -16,10 +16,9 @@ "zigpy-znp==0.5.3" ], "usb": [ - {"vid":"10C4","pid":"EA60","known_devices":["slae.sh cc2652rb stick"]}, - {"vid":"1CF1","pid":"0030","known_devices":["Conbee II"]}, - {"vid":"1A86","pid":"7523","known_devices":["Electrolama zig-a-zig-ah"]}, - {"vid":"10C4","pid":"8A2A","known_devices":["Nortek HUSBZB-1"]} + {"vid":"10C4","pid":"EA60","description":"*2652*","known_devices":["slae.sh cc2652rb stick"]}, + {"vid":"1CF1","pid":"0030","description":"*conbee*","known_devices":["Conbee II"]}, + {"vid":"10C4","pid":"8A2A","description":"*zigbee*","known_devices":["Nortek HUSBZB-1"]} ], "codeowners": ["@dmulcahey", "@adminiuga"], "zeroconf": [ diff --git a/homeassistant/generated/usb.py b/homeassistant/generated/usb.py index cb672c736b2..477a762ae62 100644 --- a/homeassistant/generated/usb.py +++ b/homeassistant/generated/usb.py @@ -9,22 +9,20 @@ USB = [ { "domain": "zha", "vid": "10C4", - "pid": "EA60" + "pid": "EA60", + "description": "*2652*" }, { "domain": "zha", "vid": "1CF1", - "pid": "0030" - }, - { - "domain": "zha", - "vid": "1A86", - "pid": "7523" + "pid": "0030", + "description": "*conbee*" }, { "domain": "zha", "vid": "10C4", - "pid": "8A2A" + "pid": "8A2A", + "description": "*zigbee*" }, { "domain": "zwave_js", diff --git a/script/hassfest/manifest.py b/script/hassfest/manifest.py index 8c9776ed7c9..abade24dbf9 100644 --- a/script/hassfest/manifest.py +++ b/script/hassfest/manifest.py @@ -210,6 +210,9 @@ MANIFEST_SCHEMA = vol.Schema( { vol.Optional("vid"): vol.All(str, verify_uppercase), vol.Optional("pid"): vol.All(str, verify_uppercase), + vol.Optional("serial_number"): vol.All(str, verify_lowercase), + vol.Optional("manufacturer"): vol.All(str, verify_lowercase), + vol.Optional("description"): vol.All(str, verify_lowercase), vol.Optional("known_devices"): [str], } ) diff --git a/tests/components/usb/test_init.py b/tests/components/usb/test_init.py index 9c480f11fc6..e22e514f230 100644 --- a/tests/components/usb/test_init.py +++ b/tests/components/usb/test_init.py @@ -9,7 +9,7 @@ from homeassistant.components import usb from homeassistant.const import EVENT_HOMEASSISTANT_STARTED from homeassistant.setup import async_setup_component -from . import slae_sh_device +from . import conbee_device, slae_sh_device @pytest.fixture(name="operating_system") @@ -171,6 +171,297 @@ async def test_discovered_by_websocket_scan(hass, hass_ws_client): assert mock_config_flow.mock_calls[0][1][0] == "test1" +async def test_discovered_by_websocket_scan_limited_by_description_matcher( + hass, hass_ws_client +): + """Test a device is discovered from websocket scan is limited by the description matcher.""" + new_usb = [ + {"domain": "test1", "vid": "3039", "pid": "3039", "description": "*2652*"} + ] + + mock_comports = [ + MagicMock( + device=slae_sh_device.device, + vid=12345, + pid=12345, + serial_number=slae_sh_device.serial_number, + manufacturer=slae_sh_device.manufacturer, + description=slae_sh_device.description, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 1 + assert mock_config_flow.mock_calls[0][1][0] == "test1" + + +async def test_discovered_by_websocket_scan_rejected_by_description_matcher( + hass, hass_ws_client +): + """Test a device is discovered from websocket scan rejected by the description matcher.""" + new_usb = [ + {"domain": "test1", "vid": "3039", "pid": "3039", "description": "*not_it*"} + ] + + mock_comports = [ + MagicMock( + device=slae_sh_device.device, + vid=12345, + pid=12345, + serial_number=slae_sh_device.serial_number, + manufacturer=slae_sh_device.manufacturer, + description=slae_sh_device.description, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 0 + + +async def test_discovered_by_websocket_scan_limited_by_serial_number_matcher( + hass, hass_ws_client +): + """Test a device is discovered from websocket scan is limited by the serial_number matcher.""" + new_usb = [ + { + "domain": "test1", + "vid": "3039", + "pid": "3039", + "serial_number": "00_12_4b_00*", + } + ] + + mock_comports = [ + MagicMock( + device=slae_sh_device.device, + vid=12345, + pid=12345, + serial_number=slae_sh_device.serial_number, + manufacturer=slae_sh_device.manufacturer, + description=slae_sh_device.description, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 1 + assert mock_config_flow.mock_calls[0][1][0] == "test1" + + +async def test_discovered_by_websocket_scan_rejected_by_serial_number_matcher( + hass, hass_ws_client +): + """Test a device is discovered from websocket scan is rejected by the serial_number matcher.""" + new_usb = [ + {"domain": "test1", "vid": "3039", "pid": "3039", "serial_number": "123*"} + ] + + mock_comports = [ + MagicMock( + device=slae_sh_device.device, + vid=12345, + pid=12345, + serial_number=slae_sh_device.serial_number, + manufacturer=slae_sh_device.manufacturer, + description=slae_sh_device.description, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 0 + + +async def test_discovered_by_websocket_scan_limited_by_manufacturer_matcher( + hass, hass_ws_client +): + """Test a device is discovered from websocket scan is limited by the manufacturer matcher.""" + new_usb = [ + { + "domain": "test1", + "vid": "3039", + "pid": "3039", + "manufacturer": "dresden elektronik ingenieurtechnik*", + } + ] + + mock_comports = [ + MagicMock( + device=conbee_device.device, + vid=12345, + pid=12345, + serial_number=conbee_device.serial_number, + manufacturer=conbee_device.manufacturer, + description=conbee_device.description, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 1 + assert mock_config_flow.mock_calls[0][1][0] == "test1" + + +async def test_discovered_by_websocket_scan_rejected_by_manufacturer_matcher( + hass, hass_ws_client +): + """Test a device is discovered from websocket scan is rejected by the manufacturer matcher.""" + new_usb = [ + { + "domain": "test1", + "vid": "3039", + "pid": "3039", + "manufacturer": "other vendor*", + } + ] + + mock_comports = [ + MagicMock( + device=conbee_device.device, + vid=12345, + pid=12345, + serial_number=conbee_device.serial_number, + manufacturer=conbee_device.manufacturer, + description=conbee_device.description, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 0 + + +async def test_discovered_by_websocket_rejected_with_empty_serial_number_only( + hass, hass_ws_client +): + """Test a device is discovered from websocket is rejected with empty serial number.""" + new_usb = [ + {"domain": "test1", "vid": "3039", "pid": "3039", "serial_number": "123*"} + ] + + mock_comports = [ + MagicMock( + device=conbee_device.device, + vid=12345, + pid=12345, + serial_number=None, + manufacturer=None, + description=None, + ) + ] + + with patch("pyudev.Context", side_effect=ImportError), patch( + "homeassistant.components.usb.async_get_usb", return_value=new_usb + ), patch( + "homeassistant.components.usb.comports", return_value=mock_comports + ), patch.object( + hass.config_entries.flow, "async_init" + ) as mock_config_flow: + assert await async_setup_component(hass, "usb", {"usb": {}}) + await hass.async_block_till_done() + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + ws_client = await hass_ws_client(hass) + await ws_client.send_json({"id": 1, "type": "usb/scan"}) + response = await ws_client.receive_json() + assert response["success"] + await hass.async_block_till_done() + + assert len(mock_config_flow.mock_calls) == 0 + + async def test_discovered_by_websocket_scan_match_vid_only(hass, hass_ws_client): """Test a device is discovered from websocket scan only matching vid.""" new_usb = [{"domain": "test1", "vid": "3039"}]