Speed-up wemo discovery (#46821)

* Speed-up wemo discovery

* Use gather_with_concurrency to limit concurrent executor usage

* Comment fixup: asyncio executors -> executor threads
This commit is contained in:
Eric Severance 2021-02-20 13:16:50 -08:00 committed by GitHub
parent 43a5852561
commit 6e52b26c06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 17 deletions

View File

@ -1,5 +1,4 @@
"""Support for WeMo device discovery.""" """Support for WeMo device discovery."""
import asyncio
import logging import logging
import pywemo import pywemo
@ -16,9 +15,14 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_call_later from homeassistant.helpers.event import async_call_later
from homeassistant.util.async_ import gather_with_concurrency
from .const import DOMAIN from .const import DOMAIN
# Max number of devices to initialize at once. This limit is in place to
# avoid tying up too many executor threads with WeMo device setup.
MAX_CONCURRENCY = 3
# Mapping from Wemo model_name to domain. # Mapping from Wemo model_name to domain.
WEMO_MODEL_DISPATCH = { WEMO_MODEL_DISPATCH = {
"Bridge": LIGHT_DOMAIN, "Bridge": LIGHT_DOMAIN,
@ -114,11 +118,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
static_conf = config.get(CONF_STATIC, []) static_conf = config.get(CONF_STATIC, [])
if static_conf: if static_conf:
_LOGGER.debug("Adding statically configured WeMo devices...") _LOGGER.debug("Adding statically configured WeMo devices...")
for device in await asyncio.gather( for device in await gather_with_concurrency(
MAX_CONCURRENCY,
*[ *[
hass.async_add_executor_job(validate_static_config, host, port) hass.async_add_executor_job(validate_static_config, host, port)
for host, port in static_conf for host, port in static_conf
] ],
): ):
if device: if device:
wemo_dispatcher.async_add_unique_device(hass, device) wemo_dispatcher.async_add_unique_device(hass, device)
@ -187,15 +192,44 @@ class WemoDiscovery:
self._wemo_dispatcher = wemo_dispatcher self._wemo_dispatcher = wemo_dispatcher
self._stop = None self._stop = None
self._scan_delay = 0 self._scan_delay = 0
self._upnp_entries = set()
async def async_add_from_upnp_entry(self, entry: pywemo.ssdp.UPNPEntry) -> None:
"""Create a WeMoDevice from an UPNPEntry and add it to the dispatcher.
Uses the self._upnp_entries set to avoid interrogating the same device
multiple times.
"""
if entry in self._upnp_entries:
return
try:
device = await self._hass.async_add_executor_job(
pywemo.discovery.device_from_uuid_and_location,
entry.udn,
entry.location,
)
except pywemo.PyWeMoException as err:
_LOGGER.error("Unable to setup WeMo %r (%s)", entry, err)
else:
self._wemo_dispatcher.async_add_unique_device(self._hass, device)
self._upnp_entries.add(entry)
async def async_discover_and_schedule(self, *_) -> None: async def async_discover_and_schedule(self, *_) -> None:
"""Periodically scan the network looking for WeMo devices.""" """Periodically scan the network looking for WeMo devices."""
_LOGGER.debug("Scanning network for WeMo devices...") _LOGGER.debug("Scanning network for WeMo devices...")
try: try:
for device in await self._hass.async_add_executor_job( # pywemo.ssdp.scan is a light-weight UDP UPnP scan for WeMo devices.
pywemo.discover_devices entries = await self._hass.async_add_executor_job(pywemo.ssdp.scan)
):
self._wemo_dispatcher.async_add_unique_device(self._hass, device) # async_add_from_upnp_entry causes multiple HTTP requests to be sent
# to the WeMo device for the initial setup of the WeMoDevice
# instance. This may take some time to complete. The per-device
# setup work is done in parallel to speed up initial setup for the
# component.
await gather_with_concurrency(
MAX_CONCURRENCY,
*[self.async_add_from_upnp_entry(entry) for entry in entries],
)
finally: finally:
# Run discovery more frequently after hass has just started. # Run discovery more frequently after hass has just started.
self._scan_delay = min( self._scan_delay = min(

View File

@ -100,28 +100,41 @@ async def test_static_config_with_invalid_host(hass):
async def test_discovery(hass, pywemo_registry): async def test_discovery(hass, pywemo_registry):
"""Verify that discovery dispatches devices to the platform for setup.""" """Verify that discovery dispatches devices to the platform for setup."""
def create_device(counter): def create_device(uuid, location):
"""Create a unique mock Motion detector device for each counter value.""" """Create a unique mock Motion detector device for each counter value."""
device = create_autospec(pywemo.Motion, instance=True) device = create_autospec(pywemo.Motion, instance=True)
device.host = f"{MOCK_HOST}_{counter}" device.host = location
device.port = MOCK_PORT + counter device.port = MOCK_PORT
device.name = f"{MOCK_NAME}_{counter}" device.name = f"{MOCK_NAME}_{uuid}"
device.serialnumber = f"{MOCK_SERIAL_NUMBER}_{counter}" device.serialnumber = f"{MOCK_SERIAL_NUMBER}_{uuid}"
device.model_name = "Motion" device.model_name = "Motion"
device.get_state.return_value = 0 # Default to Off device.get_state.return_value = 0 # Default to Off
return device return device
pywemo_devices = [create_device(0), create_device(1)] def create_upnp_entry(counter):
return pywemo.ssdp.UPNPEntry.from_response(
"\r\n".join(
[
"",
f"LOCATION: http://192.168.1.100:{counter}/setup.xml",
f"USN: uuid:Socket-1_0-SERIAL{counter}::upnp:rootdevice",
"",
]
)
)
upnp_entries = [create_upnp_entry(0), create_upnp_entry(1)]
# Setup the component and start discovery. # Setup the component and start discovery.
with patch( with patch(
"pywemo.discover_devices", return_value=pywemo_devices "pywemo.discovery.device_from_uuid_and_location", side_effect=create_device
) as mock_discovery: ), patch("pywemo.ssdp.scan", return_value=upnp_entries) as mock_scan:
assert await async_setup_component( assert await async_setup_component(
hass, DOMAIN, {DOMAIN: {CONF_DISCOVERY: True}} hass, DOMAIN, {DOMAIN: {CONF_DISCOVERY: True}}
) )
await pywemo_registry.semaphore.acquire() # Returns after platform setup. await pywemo_registry.semaphore.acquire() # Returns after platform setup.
mock_discovery.assert_called() mock_scan.assert_called()
pywemo_devices.append(create_device(2)) # Add two of the same entries to test deduplication.
upnp_entries.extend([create_upnp_entry(2), create_upnp_entry(2)])
# Test that discovery runs periodically and the async_dispatcher_send code works. # Test that discovery runs periodically and the async_dispatcher_send code works.
async_fire_time_changed( async_fire_time_changed(