Implement auto switching when there are multiple bluetooth scanners (#76947)

This commit is contained in:
J. Nick Koston 2022-08-17 12:38:04 -10:00 committed by GitHub
parent 7bf13167d8
commit 071cae2c0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 259 additions and 20 deletions

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from datetime import datetime, timedelta
import itertools
import logging
@ -38,9 +39,56 @@ if TYPE_CHECKING:
FILTER_UUIDS: Final = "UUIDs"
RSSI_SWITCH_THRESHOLD = 10
STALE_ADVERTISEMENT_SECONDS = 180
_LOGGER = logging.getLogger(__name__)
@dataclass
class AdvertisementHistory:
"""Bluetooth advertisement history."""
ble_device: BLEDevice
advertisement_data: AdvertisementData
time: float
source: str
def _prefer_previous_adv(old: AdvertisementHistory, new: AdvertisementHistory) -> bool:
"""Prefer previous advertisement if it is better."""
if new.time - old.time > STALE_ADVERTISEMENT_SECONDS:
# If the old advertisement is stale, any new advertisement is preferred
if new.source != old.source:
_LOGGER.debug(
"%s (%s): Switching from %s to %s (time_elapsed:%s > stale_seconds:%s)",
new.advertisement_data.local_name,
new.ble_device.address,
old.source,
new.source,
new.time - old.time,
STALE_ADVERTISEMENT_SECONDS,
)
return False
if new.ble_device.rssi - RSSI_SWITCH_THRESHOLD > old.ble_device.rssi:
# If new advertisement is RSSI_SWITCH_THRESHOLD more, the new one is preferred
if new.source != old.source:
_LOGGER.debug(
"%s (%s): Switching from %s to %s (new_rssi:%s - threadshold:%s > old_rssi:%s)",
new.advertisement_data.local_name,
new.ble_device.address,
old.source,
new.source,
new.ble_device.rssi,
RSSI_SWITCH_THRESHOLD,
old.ble_device.rssi,
)
return False
# If the source is the different, the old one is preferred because its
# not stale and its RSSI_SWITCH_THRESHOLD less than the new one
return old.source != new.source
def _dispatch_bleak_callback(
callback: AdvertisementDataCallback,
filters: dict[str, set[str]],
@ -82,7 +130,7 @@ class BluetoothManager:
self._bleak_callbacks: list[
tuple[AdvertisementDataCallback, dict[str, set[str]]]
] = []
self.history: dict[str, tuple[BLEDevice, AdvertisementData, float, str]] = {}
self.history: dict[str, AdvertisementHistory] = {}
self._scanners: list[HaScanner] = []
@hass_callback
@ -110,7 +158,7 @@ class BluetoothManager:
@hass_callback
def async_discovered_devices(self) -> list[BLEDevice]:
"""Return all of combined best path to discovered from all the scanners."""
return [history[0] for history in self.history.values()]
return [history.ble_device for history in self.history.values()]
@hass_callback
def async_setup_unavailable_tracking(self) -> None:
@ -159,12 +207,15 @@ class BluetoothManager:
than the source from the history or the timestamp
in the history is older than 180s
"""
self.history[device.address] = (
device,
advertisement_data,
monotonic_time,
source,
new_history = AdvertisementHistory(
device, advertisement_data, monotonic_time, source
)
if (old_history := self.history.get(device.address)) and _prefer_previous_adv(
old_history, new_history
):
return
self.history[device.address] = new_history
for callback_filters in self._bleak_callbacks:
_dispatch_bleak_callback(*callback_filters, device, advertisement_data)
@ -246,13 +297,12 @@ class BluetoothManager:
if (
matcher
and (address := matcher.get(ADDRESS))
and (device_adv_data := self.history.get(address))
and (history := self.history.get(address))
):
ble_device, adv_data, _, _ = device_adv_data
try:
callback(
BluetoothServiceInfoBleak.from_advertisement(
ble_device, adv_data, SOURCE_LOCAL
history.ble_device, history.advertisement_data, SOURCE_LOCAL
),
BluetoothChange.ADVERTISEMENT,
)
@ -264,8 +314,8 @@ class BluetoothManager:
@hass_callback
def async_ble_device_from_address(self, address: str) -> BLEDevice | None:
"""Return the BLEDevice if present."""
if ble_adv := self.history.get(address):
return ble_adv[0]
if history := self.history.get(address):
return history.ble_device
return None
@hass_callback
@ -278,9 +328,9 @@ class BluetoothManager:
"""Return if the address is present."""
return [
BluetoothServiceInfoBleak.from_advertisement(
device_adv[0], device_adv[1], SOURCE_LOCAL
history.ble_device, history.advertisement_data, SOURCE_LOCAL
)
for device_adv in self.history.values()
for history in self.history.values()
]
@hass_callback
@ -312,7 +362,9 @@ class BluetoothManager:
# Replay the history since otherwise we miss devices
# that were already discovered before the callback was registered
# or we are in passive mode
for device, advertisement_data, _, _ in self.history.values():
_dispatch_bleak_callback(callback, filters, device, advertisement_data)
for history in self.history.values():
_dispatch_bleak_callback(
callback, filters, history.ble_device, history.advertisement_data
)
return _remove_callback

View File

@ -16,10 +16,22 @@ def _get_manager() -> BluetoothManager:
def inject_advertisement(device: BLEDevice, adv: AdvertisementData) -> None:
"""Return the underlying scanner that has been wrapped."""
return _get_manager().scanner_adv_received(
device, adv, time.monotonic(), SOURCE_LOCAL
)
"""Inject an advertisement into the manager."""
return inject_advertisement_with_source(device, adv, SOURCE_LOCAL)
def inject_advertisement_with_source(
device: BLEDevice, adv: AdvertisementData, source: str
) -> None:
"""Inject an advertisement into the manager from a specific source."""
inject_advertisement_with_time_and_source(device, adv, time.monotonic(), source)
def inject_advertisement_with_time_and_source(
device: BLEDevice, adv: AdvertisementData, time: float, source: str
) -> None:
"""Inject an advertisement into the manager from a specific source at a time."""
return _get_manager().scanner_adv_received(device, adv, time, source)
def patch_all_discovered_devices(mock_discovered: list[BLEDevice]) -> None:

View File

@ -0,0 +1,175 @@
"""Tests for the Bluetooth integration manager."""
from bleak.backends.scanner import AdvertisementData, BLEDevice
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.manager import STALE_ADVERTISEMENT_SECONDS
from . import (
inject_advertisement_with_source,
inject_advertisement_with_time_and_source,
)
async def test_advertisements_do_not_switch_adapters_for_no_reason(
hass, enable_bluetooth
):
"""Test we only switch adapters when needed."""
address = "44:44:33:11:23:12"
switchbot_device_signal_100 = BLEDevice(address, "wohand_signal_100", rssi=-100)
switchbot_adv_signal_100 = AdvertisementData(
local_name="wohand_signal_100", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_signal_100, switchbot_adv_signal_100, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_signal_100
)
switchbot_device_signal_99 = BLEDevice(address, "wohand_signal_99", rssi=-99)
switchbot_adv_signal_99 = AdvertisementData(
local_name="wohand_signal_99", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_signal_99, switchbot_adv_signal_99, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_signal_99
)
switchbot_device_signal_98 = BLEDevice(address, "wohand_good_signal", rssi=-98)
switchbot_adv_signal_98 = AdvertisementData(
local_name="wohand_good_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_signal_98, switchbot_adv_signal_98, "hci1"
)
# should not switch to hci1
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_signal_99
)
async def test_switching_adapters_based_on_rssi(hass, enable_bluetooth):
"""Test switching adapters based on rssi."""
address = "44:44:33:11:23:45"
switchbot_device_poor_signal = BLEDevice(address, "wohand_poor_signal", rssi=-100)
switchbot_adv_poor_signal = AdvertisementData(
local_name="wohand_poor_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_poor_signal
)
switchbot_device_good_signal = BLEDevice(address, "wohand_good_signal", rssi=-60)
switchbot_adv_good_signal = AdvertisementData(
local_name="wohand_good_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_good_signal, switchbot_adv_good_signal, "hci1"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_good_signal
)
inject_advertisement_with_source(
switchbot_device_good_signal, switchbot_adv_poor_signal, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_good_signal
)
# We should not switch adapters unless the signal hits the threshold
switchbot_device_similar_signal = BLEDevice(
address, "wohand_similar_signal", rssi=-62
)
switchbot_adv_similar_signal = AdvertisementData(
local_name="wohand_similar_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_good_signal
)
async def test_switching_adapters_based_on_stale(hass, enable_bluetooth):
"""Test switching adapters based on the previous advertisement being stale."""
address = "44:44:33:11:23:41"
start_time_monotonic = 50.0
switchbot_device_poor_signal_hci0 = BLEDevice(
address, "wohand_poor_signal_hci0", rssi=-100
)
switchbot_adv_poor_signal_hci0 = AdvertisementData(
local_name="wohand_poor_signal_hci0", service_uuids=[]
)
inject_advertisement_with_time_and_source(
switchbot_device_poor_signal_hci0,
switchbot_adv_poor_signal_hci0,
start_time_monotonic,
"hci0",
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_poor_signal_hci0
)
switchbot_device_poor_signal_hci1 = BLEDevice(
address, "wohand_poor_signal_hci1", rssi=-99
)
switchbot_adv_poor_signal_hci1 = AdvertisementData(
local_name="wohand_poor_signal_hci1", service_uuids=[]
)
inject_advertisement_with_time_and_source(
switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1,
start_time_monotonic,
"hci1",
)
# Should not switch adapters until the advertisement is stale
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_poor_signal_hci0
)
# Should switch to hci1 since the previous advertisement is stale
# even though the signal is poor because the device is now
# likely unreachable via hci0
inject_advertisement_with_time_and_source(
switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1,
start_time_monotonic + STALE_ADVERTISEMENT_SECONDS + 1,
"hci1",
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
is switchbot_device_poor_signal_hci1
)