Add Bluetooth WebSocket API to subscribe to connection allocations (#136215)

This commit is contained in:
J. Nick Koston 2025-01-27 11:49:49 -10:00 committed by GitHub
parent 0b17d11683
commit e0ea5bfc51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 345 additions and 42 deletions

View File

@ -11,13 +11,23 @@ from bluetooth_adapters import (
adapter_unique_name, adapter_unique_name,
) )
from bluetooth_data_tools import monotonic_time_coarse from bluetooth_data_tools import monotonic_time_coarse
from habluetooth import get_manager
from homeassistant.core import callback from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from .models import BluetoothServiceInfoBleak from .models import BluetoothServiceInfoBleak
from .storage import BluetoothStorage from .storage import BluetoothStorage
class InvalidConfigEntryID(HomeAssistantError):
"""Invalid config entry id."""
class InvalidSource(HomeAssistantError):
"""Invalid source."""
@callback @callback
def async_load_history_from_system( def async_load_history_from_system(
adapters: BluetoothAdapters, storage: BluetoothStorage adapters: BluetoothAdapters, storage: BluetoothStorage
@ -85,3 +95,14 @@ def adapter_title(adapter: str, details: AdapterDetails) -> str:
model = details.get(ADAPTER_PRODUCT, "Unknown") model = details.get(ADAPTER_PRODUCT, "Unknown")
manufacturer = details[ADAPTER_MANUFACTURER] or "Unknown" manufacturer = details[ADAPTER_MANUFACTURER] or "Unknown"
return f"{manufacturer} {model} ({unique_name})" return f"{manufacturer} {model} ({unique_name})"
def config_entry_id_to_source(hass: HomeAssistant, config_entry_id: str) -> str:
"""Convert a config entry id to a source."""
if not (entry := hass.config_entries.async_get_entry(config_entry_id)):
raise InvalidConfigEntryID(f"Config entry {config_entry_id} not found")
source = entry.unique_id
assert source is not None
if not get_manager().async_scanner_by_source(source):
raise InvalidSource(f"Source {source} not found")
return source

View File

@ -7,7 +7,7 @@ from functools import lru_cache, partial
import time import time
from typing import Any from typing import Any
from habluetooth import BluetoothScanningMode from habluetooth import BluetoothScanningMode, HaBluetoothSlotAllocations
from home_assistant_bluetooth import BluetoothServiceInfoBleak from home_assistant_bluetooth import BluetoothServiceInfoBleak
import voluptuous as vol import voluptuous as vol
@ -18,12 +18,14 @@ from homeassistant.helpers.json import json_bytes
from .api import _get_manager, async_register_callback from .api import _get_manager, async_register_callback
from .match import BluetoothCallbackMatcher from .match import BluetoothCallbackMatcher
from .models import BluetoothChange from .models import BluetoothChange
from .util import InvalidConfigEntryID, InvalidSource, config_entry_id_to_source
@callback @callback
def async_setup(hass: HomeAssistant) -> None: def async_setup(hass: HomeAssistant) -> None:
"""Set up the bluetooth websocket API.""" """Set up the bluetooth websocket API."""
websocket_api.async_register_command(hass, ws_subscribe_advertisements) websocket_api.async_register_command(hass, ws_subscribe_advertisements)
websocket_api.async_register_command(hass, ws_subscribe_connection_allocations)
@lru_cache(maxsize=1024) @lru_cache(maxsize=1024)
@ -135,6 +137,7 @@ class _AdvertisementSubscription:
self._async_added((service_info,)) self._async_added((service_info,))
@websocket_api.require_admin
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): "bluetooth/subscribe_advertisements", vol.Required("type"): "bluetooth/subscribe_advertisements",
@ -148,3 +151,43 @@ async def ws_subscribe_advertisements(
_AdvertisementSubscription( _AdvertisementSubscription(
hass, connection, msg["id"], BluetoothCallbackMatcher(connectable=False) hass, connection, msg["id"], BluetoothCallbackMatcher(connectable=False)
).async_start() ).async_start()
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required("type"): "bluetooth/subscribe_connection_allocations",
vol.Optional("config_entry_id"): str,
}
)
@websocket_api.async_response
async def ws_subscribe_connection_allocations(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
) -> None:
"""Handle subscribe advertisements websocket command."""
ws_msg_id = msg["id"]
source: str | None = None
if config_entry_id := msg.get("config_entry_id"):
try:
source = config_entry_id_to_source(hass, config_entry_id)
except InvalidConfigEntryID as err:
connection.send_error(ws_msg_id, "invalid_config_entry_id", str(err))
return
except InvalidSource as err:
connection.send_error(ws_msg_id, "invalid_source", str(err))
return
def _async_allocations_changed(allocations: HaBluetoothSlotAllocations) -> None:
connection.send_message(
json_bytes(websocket_api.event_message(ws_msg_id, [allocations]))
)
manager = _get_manager(hass)
connection.subscriptions[ws_msg_id] = manager.async_register_allocation_callback(
_async_allocations_changed, source
)
connection.send_message(json_bytes(websocket_api.result_message(ws_msg_id)))
if current_allocations := manager.async_current_allocations(source):
connection.send_message(
json_bytes(websocket_api.event_message(ws_msg_id, current_allocations))
)

View File

@ -10,7 +10,7 @@ from unittest.mock import MagicMock, patch
from bleak import BleakClient from bleak import BleakClient
from bleak.backends.scanner import AdvertisementData, BLEDevice from bleak.backends.scanner import AdvertisementData, BLEDevice
from bluetooth_adapters import DEFAULT_ADDRESS from bluetooth_adapters import DEFAULT_ADDRESS
from habluetooth import BaseHaScanner, BluetoothManager, get_manager from habluetooth import BaseHaScanner, get_manager
from homeassistant.components.bluetooth import ( from homeassistant.components.bluetooth import (
DOMAIN, DOMAIN,
@ -21,6 +21,7 @@ from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak, BluetoothServiceInfoBleak,
async_get_advertisement_callback, async_get_advertisement_callback,
) )
from homeassistant.components.bluetooth.manager import HomeAssistantBluetoothManager
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
@ -57,6 +58,11 @@ BLE_DEVICE_DEFAULTS = {
} }
HCI0_SOURCE_ADDRESS = "AA:BB:CC:DD:EE:00"
HCI1_SOURCE_ADDRESS = "AA:BB:CC:DD:EE:11"
NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS = "AA:BB:CC:DD:EE:FF"
@contextmanager @contextmanager
def patch_bluetooth_time(mock_time: float) -> None: def patch_bluetooth_time(mock_time: float) -> None:
"""Patch the bluetooth time.""" """Patch the bluetooth time."""
@ -101,9 +107,10 @@ def generate_ble_device(
return BLEDevice(**new) return BLEDevice(**new)
def _get_manager() -> BluetoothManager: def _get_manager() -> HomeAssistantBluetoothManager:
"""Return the bluetooth manager.""" """Return the bluetooth manager."""
return get_manager() manager: HomeAssistantBluetoothManager = get_manager()
return manager
def inject_advertisement( def inject_advertisement(

View File

@ -5,13 +5,19 @@ from unittest.mock import patch
from bleak_retry_connector import bleak_manager from bleak_retry_connector import bleak_manager
from dbus_fast.aio import message_bus from dbus_fast.aio import message_bus
from habluetooth import BaseHaRemoteScanner
import habluetooth.util as habluetooth_utils import habluetooth.util as habluetooth_utils
import pytest import pytest
from homeassistant.components import bluetooth from homeassistant.components import bluetooth
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from . import FakeScanner from . import (
HCI0_SOURCE_ADDRESS,
HCI1_SOURCE_ADDRESS,
NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS,
FakeScanner,
)
@pytest.fixture(name="disable_bluez_manager_socket", autouse=True, scope="package") @pytest.fixture(name="disable_bluez_manager_socket", autouse=True, scope="package")
@ -314,8 +320,9 @@ def disable_new_discovery_flows_fixture():
@pytest.fixture @pytest.fixture
def register_hci0_scanner(hass: HomeAssistant) -> Generator[None]: def register_hci0_scanner(hass: HomeAssistant) -> Generator[None]:
"""Register an hci0 scanner.""" """Register an hci0 scanner."""
hci0_scanner = FakeScanner("hci0", "hci0") hci0_scanner = FakeScanner(HCI0_SOURCE_ADDRESS, "hci0")
cancel = bluetooth.async_register_scanner(hass, hci0_scanner) hci0_scanner.connectable = True
cancel = bluetooth.async_register_scanner(hass, hci0_scanner, connection_slots=5)
yield yield
cancel() cancel()
bluetooth.async_remove_scanner(hass, hci0_scanner.source) bluetooth.async_remove_scanner(hass, hci0_scanner.source)
@ -324,8 +331,21 @@ def register_hci0_scanner(hass: HomeAssistant) -> Generator[None]:
@pytest.fixture @pytest.fixture
def register_hci1_scanner(hass: HomeAssistant) -> Generator[None]: def register_hci1_scanner(hass: HomeAssistant) -> Generator[None]:
"""Register an hci1 scanner.""" """Register an hci1 scanner."""
hci1_scanner = FakeScanner("hci1", "hci1") hci1_scanner = FakeScanner(HCI1_SOURCE_ADDRESS, "hci1")
cancel = bluetooth.async_register_scanner(hass, hci1_scanner) hci1_scanner.connectable = True
cancel = bluetooth.async_register_scanner(hass, hci1_scanner, connection_slots=5)
yield yield
cancel() cancel()
bluetooth.async_remove_scanner(hass, hci1_scanner.source) bluetooth.async_remove_scanner(hass, hci1_scanner.source)
@pytest.fixture
def register_non_connectable_scanner(hass: HomeAssistant) -> Generator[None]:
"""Register an non connectable remote scanner."""
remote_scanner = BaseHaRemoteScanner(
NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS, "non connectable", None, False
)
cancel = bluetooth.async_register_scanner(hass, remote_scanner)
yield
cancel()
bluetooth.async_remove_scanner(hass, remote_scanner.source)

View File

@ -45,6 +45,8 @@ from homeassistant.util.dt import utcnow
from homeassistant.util.json import json_loads from homeassistant.util.json import json_loads
from . import ( from . import (
HCI0_SOURCE_ADDRESS,
HCI1_SOURCE_ADDRESS,
FakeScanner, FakeScanner,
MockBleakClient, MockBleakClient,
_get_manager, _get_manager,
@ -82,7 +84,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason(
local_name="wohand_signal_100", service_uuids=[] local_name="wohand_signal_100", service_uuids=[]
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci0" hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI0_SOURCE_ADDRESS
) )
assert ( assert (
@ -97,7 +99,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason(
local_name="wohand_signal_99", service_uuids=[] local_name="wohand_signal_99", service_uuids=[]
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_signal_99, switchbot_adv_signal_99, "hci0" hass, switchbot_device_signal_99, switchbot_adv_signal_99, HCI0_SOURCE_ADDRESS
) )
assert ( assert (
@ -112,7 +114,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason(
local_name="wohand_good_signal", service_uuids=[] local_name="wohand_good_signal", service_uuids=[]
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_signal_98, switchbot_adv_signal_98, "hci1" hass, switchbot_device_signal_98, switchbot_adv_signal_98, HCI1_SOURCE_ADDRESS
) )
# should not switch to hci1 # should not switch to hci1
@ -137,7 +139,10 @@ async def test_switching_adapters_based_on_rssi(
local_name="wohand_poor_signal", service_uuids=[], rssi=-100 local_name="wohand_poor_signal", service_uuids=[], rssi=-100
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
HCI0_SOURCE_ADDRESS,
) )
assert ( assert (
@ -150,7 +155,10 @@ async def test_switching_adapters_based_on_rssi(
local_name="wohand_good_signal", service_uuids=[], rssi=-60 local_name="wohand_good_signal", service_uuids=[], rssi=-60
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_good_signal, switchbot_adv_good_signal, "hci1" hass,
switchbot_device_good_signal,
switchbot_adv_good_signal,
HCI1_SOURCE_ADDRESS,
) )
assert ( assert (
@ -159,7 +167,10 @@ async def test_switching_adapters_based_on_rssi(
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_good_signal, switchbot_adv_poor_signal, "hci0" hass,
switchbot_device_good_signal,
switchbot_adv_poor_signal,
HCI0_SOURCE_ADDRESS,
) )
assert ( assert (
bluetooth.async_ble_device_from_address(hass, address) bluetooth.async_ble_device_from_address(hass, address)
@ -175,7 +186,10 @@ async def test_switching_adapters_based_on_rssi(
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0" hass,
switchbot_device_similar_signal,
switchbot_adv_similar_signal,
HCI0_SOURCE_ADDRESS,
) )
assert ( assert (
bluetooth.async_ble_device_from_address(hass, address) bluetooth.async_ble_device_from_address(hass, address)
@ -198,7 +212,7 @@ async def test_switching_adapters_based_on_zero_rssi(
local_name="wohand_no_rssi", service_uuids=[], rssi=0 local_name="wohand_no_rssi", service_uuids=[], rssi=0
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_no_rssi, switchbot_adv_no_rssi, "hci0" hass, switchbot_device_no_rssi, switchbot_adv_no_rssi, HCI0_SOURCE_ADDRESS
) )
assert ( assert (
@ -211,7 +225,10 @@ async def test_switching_adapters_based_on_zero_rssi(
local_name="wohand_good_signal", service_uuids=[], rssi=-60 local_name="wohand_good_signal", service_uuids=[], rssi=-60
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_good_signal, switchbot_adv_good_signal, "hci1" hass,
switchbot_device_good_signal,
switchbot_adv_good_signal,
HCI1_SOURCE_ADDRESS,
) )
assert ( assert (
@ -220,7 +237,7 @@ async def test_switching_adapters_based_on_zero_rssi(
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_good_signal, switchbot_adv_no_rssi, "hci0" hass, switchbot_device_good_signal, switchbot_adv_no_rssi, HCI0_SOURCE_ADDRESS
) )
assert ( assert (
bluetooth.async_ble_device_from_address(hass, address) bluetooth.async_ble_device_from_address(hass, address)
@ -236,7 +253,10 @@ async def test_switching_adapters_based_on_zero_rssi(
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0" hass,
switchbot_device_similar_signal,
switchbot_adv_similar_signal,
HCI0_SOURCE_ADDRESS,
) )
assert ( assert (
bluetooth.async_ble_device_from_address(hass, address) bluetooth.async_ble_device_from_address(hass, address)
@ -266,7 +286,7 @@ async def test_switching_adapters_based_on_stale(
switchbot_device_poor_signal_hci0, switchbot_device_poor_signal_hci0,
switchbot_adv_poor_signal_hci0, switchbot_adv_poor_signal_hci0,
start_time_monotonic, start_time_monotonic,
"hci0", HCI0_SOURCE_ADDRESS,
) )
assert ( assert (
@ -285,7 +305,7 @@ async def test_switching_adapters_based_on_stale(
switchbot_device_poor_signal_hci1, switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1, switchbot_adv_poor_signal_hci1,
start_time_monotonic, start_time_monotonic,
"hci1", HCI1_SOURCE_ADDRESS,
) )
# Should not switch adapters until the advertisement is stale # Should not switch adapters until the advertisement is stale
@ -333,7 +353,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval(
switchbot_device_poor_signal_hci0, switchbot_device_poor_signal_hci0,
switchbot_adv_poor_signal_hci0, switchbot_adv_poor_signal_hci0,
start_time_monotonic, start_time_monotonic,
"hci0", HCI0_SOURCE_ADDRESS,
) )
assert ( assert (
@ -354,7 +374,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval(
switchbot_device_poor_signal_hci1, switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1, switchbot_adv_poor_signal_hci1,
start_time_monotonic, start_time_monotonic,
"hci1", HCI1_SOURCE_ADDRESS,
) )
# Should not switch adapters until the advertisement is stale # Should not switch adapters until the advertisement is stale
@ -368,7 +388,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval(
switchbot_device_poor_signal_hci1, switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1, switchbot_adv_poor_signal_hci1,
start_time_monotonic + 10 + 1, start_time_monotonic + 10 + 1,
"hci1", HCI1_SOURCE_ADDRESS,
) )
# Should not switch yet since we are not within the # Should not switch yet since we are not within the
@ -383,7 +403,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval(
switchbot_device_poor_signal_hci1, switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1, switchbot_adv_poor_signal_hci1,
start_time_monotonic + 10 + TRACKER_BUFFERING_WOBBLE_SECONDS + 1, start_time_monotonic + 10 + TRACKER_BUFFERING_WOBBLE_SECONDS + 1,
"hci1", HCI1_SOURCE_ADDRESS,
) )
# Should switch to hci1 since the previous advertisement is stale # Should switch to hci1 since the previous advertisement is stale
# even though the signal is poor because the device is now # even though the signal is poor because the device is now
@ -404,7 +424,9 @@ async def test_restore_history_from_dbus(
ble_device = generate_ble_device(address, "name") ble_device = generate_ble_device(address, "name")
history = { history = {
address: AdvertisementHistory( address: AdvertisementHistory(
ble_device, generate_advertisement_data(local_name="name"), "hci0" ble_device,
generate_advertisement_data(local_name="name"),
HCI0_SOURCE_ADDRESS,
) )
} }
@ -440,7 +462,9 @@ async def test_restore_history_from_dbus_and_remote_adapters(
ble_device = generate_ble_device(address, "name") ble_device = generate_ble_device(address, "name")
history = { history = {
address: AdvertisementHistory( address: AdvertisementHistory(
ble_device, generate_advertisement_data(local_name="name"), "hci0" ble_device,
generate_advertisement_data(local_name="name"),
HCI0_SOURCE_ADDRESS,
) )
} }
@ -480,7 +504,9 @@ async def test_restore_history_from_dbus_and_corrupted_remote_adapters(
ble_device = generate_ble_device(address, "name") ble_device = generate_ble_device(address, "name")
history = { history = {
address: AdvertisementHistory( address: AdvertisementHistory(
ble_device, generate_advertisement_data(local_name="name"), "hci0" ble_device,
generate_advertisement_data(local_name="name"),
HCI0_SOURCE_ADDRESS,
) )
} }
@ -511,7 +537,12 @@ async def test_switching_adapters_based_on_rssi_connectable_to_non_connectable(
local_name="wohand_poor_signal", service_uuids=[], rssi=-100 local_name="wohand_poor_signal", service_uuids=[], rssi=-100
) )
inject_advertisement_with_time_and_source_connectable( inject_advertisement_with_time_and_source_connectable(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, now, "hci0", True hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
now,
HCI0_SOURCE_ADDRESS,
True,
) )
assert ( assert (
@ -607,7 +638,7 @@ async def test_connectable_advertisement_can_be_retrieved_with_best_path_is_non_
switchbot_device_good_signal, switchbot_device_good_signal,
switchbot_adv_good_signal, switchbot_adv_good_signal,
now, now,
"hci1", HCI1_SOURCE_ADDRESS,
False, False,
) )
@ -622,7 +653,12 @@ async def test_connectable_advertisement_can_be_retrieved_with_best_path_is_non_
local_name="wohand_poor_signal", service_uuids=[], rssi=-100 local_name="wohand_poor_signal", service_uuids=[], rssi=-100
) )
inject_advertisement_with_time_and_source_connectable( inject_advertisement_with_time_and_source_connectable(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, now, "hci0", True hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
now,
HCI0_SOURCE_ADDRESS,
True,
) )
assert ( assert (
@ -662,7 +698,10 @@ async def test_switching_adapters_when_one_goes_away(
local_name="wohand_poor_signal", service_uuids=[], rssi=-100 local_name="wohand_poor_signal", service_uuids=[], rssi=-100
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
HCI0_SOURCE_ADDRESS,
) )
# We want to prefer the good signal when we have options # We want to prefer the good signal when we have options
@ -674,7 +713,10 @@ async def test_switching_adapters_when_one_goes_away(
cancel_hci2() cancel_hci2()
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
HCI0_SOURCE_ADDRESS,
) )
# Now that hci2 is gone, we should prefer the poor signal # Now that hci2 is gone, we should prefer the poor signal
@ -713,7 +755,10 @@ async def test_switching_adapters_when_one_stop_scanning(
local_name="wohand_poor_signal", service_uuids=[], rssi=-100 local_name="wohand_poor_signal", service_uuids=[], rssi=-100
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
HCI0_SOURCE_ADDRESS,
) )
# We want to prefer the good signal when we have options # We want to prefer the good signal when we have options
@ -725,7 +770,10 @@ async def test_switching_adapters_when_one_stop_scanning(
hci2_scanner.scanning = False hci2_scanner.scanning = False
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" hass,
switchbot_device_poor_signal,
switchbot_adv_poor_signal,
HCI0_SOURCE_ADDRESS,
) )
# Now that hci2 has stopped scanning, we should prefer the poor signal # Now that hci2 has stopped scanning, we should prefer the poor signal

View File

@ -5,19 +5,25 @@ from datetime import timedelta
import time import time
from unittest.mock import ANY, patch from unittest.mock import ANY, patch
from bleak_retry_connector import Allocations
from freezegun import freeze_time from freezegun import freeze_time
import pytest import pytest
from homeassistant.components.bluetooth import DOMAIN
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.util.dt import utcnow from homeassistant.util.dt import utcnow
from . import ( from . import (
HCI0_SOURCE_ADDRESS,
HCI1_SOURCE_ADDRESS,
NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS,
_get_manager,
generate_advertisement_data, generate_advertisement_data,
generate_ble_device, generate_ble_device,
inject_advertisement_with_source, inject_advertisement_with_source,
) )
from tests.common import async_fire_time_changed from tests.common import MockConfigEntry, async_fire_time_changed
from tests.typing import WebSocketGenerator from tests.typing import WebSocketGenerator
@ -38,7 +44,7 @@ async def test_subscribe_advertisements(
local_name="wohand_signal_100", service_uuids=[] local_name="wohand_signal_100", service_uuids=[]
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci0" hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI0_SOURCE_ADDRESS
) )
client = await hass_ws_client() client = await hass_ws_client()
@ -64,7 +70,7 @@ async def test_subscribe_advertisements(
"rssi": -127, "rssi": -127,
"service_data": {}, "service_data": {},
"service_uuids": [], "service_uuids": [],
"source": "hci0", "source": HCI0_SOURCE_ADDRESS,
"time": ANY, "time": ANY,
"tx_power": -127, "tx_power": -127,
} }
@ -79,7 +85,7 @@ async def test_subscribe_advertisements(
rssi=-80, rssi=-80,
) )
inject_advertisement_with_source( inject_advertisement_with_source(
hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci1" hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI1_SOURCE_ADDRESS
) )
async with asyncio.timeout(1): async with asyncio.timeout(1):
response = await client.receive_json() response = await client.receive_json()
@ -93,7 +99,7 @@ async def test_subscribe_advertisements(
"rssi": -80, "rssi": -80,
"service_data": {}, "service_data": {},
"service_uuids": [], "service_uuids": [],
"source": "hci1", "source": HCI1_SOURCE_ADDRESS,
"time": ANY, "time": ANY,
"tx_power": -127, "tx_power": -127,
} }
@ -114,3 +120,161 @@ async def test_subscribe_advertisements(
async with asyncio.timeout(1): async with asyncio.timeout(1):
response = await client.receive_json() response = await client.receive_json()
assert response["event"] == {"remove": [{"address": "44:44:33:11:23:12"}]} assert response["event"] == {"remove": [{"address": "44:44:33:11:23:12"}]}
@pytest.mark.usefixtures("enable_bluetooth")
async def test_subscribe_subscribe_connection_allocations(
hass: HomeAssistant,
register_hci0_scanner: None,
register_hci1_scanner: None,
register_non_connectable_scanner: None,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test bluetooth subscribe_connection_allocations."""
address = "44:44:33:11:23:12"
switchbot_device_signal_100 = generate_ble_device(
address, "wohand_signal_100", rssi=-100
)
switchbot_adv_signal_100 = generate_advertisement_data(
local_name="wohand_signal_100", service_uuids=[]
)
inject_advertisement_with_source(
hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI0_SOURCE_ADDRESS
)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "bluetooth/subscribe_connection_allocations",
}
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["success"]
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"] == [
{
"allocated": [],
"free": 0,
"slots": 0,
"source": NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS,
}
]
manager = _get_manager()
manager.async_on_allocation_changed(
Allocations(
adapter="hci1", # Will be translated to source
slots=5,
free=4,
allocated=["AA:BB:CC:DD:EE:EE"],
)
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"] == [
{
"allocated": ["AA:BB:CC:DD:EE:EE"],
"free": 4,
"slots": 5,
"source": "AA:BB:CC:DD:EE:11",
}
]
manager.async_on_allocation_changed(
Allocations(
adapter="hci1", # Will be translated to source
slots=5,
free=5,
allocated=[],
)
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"] == [
{"allocated": [], "free": 5, "slots": 5, "source": HCI1_SOURCE_ADDRESS}
]
@pytest.mark.usefixtures("enable_bluetooth")
async def test_subscribe_subscribe_connection_allocations_specific_scanner(
hass: HomeAssistant,
register_non_connectable_scanner: None,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test bluetooth subscribe_connection_allocations for a specific source address."""
entry = MockConfigEntry(
domain=DOMAIN, unique_id=NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS
)
entry.add_to_hass(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "bluetooth/subscribe_connection_allocations",
"config_entry_id": entry.entry_id,
}
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["success"]
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"] == [
{
"allocated": [],
"free": 0,
"slots": 0,
"source": NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS,
}
]
@pytest.mark.usefixtures("enable_bluetooth")
async def test_subscribe_subscribe_connection_allocations_invalid_config_entry_id(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test bluetooth subscribe_connection_allocations for an invalid config entry id."""
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "bluetooth/subscribe_connection_allocations",
"config_entry_id": "non_existent",
}
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_config_entry_id"
assert response["error"]["message"] == "Config entry non_existent not found"
@pytest.mark.usefixtures("enable_bluetooth")
async def test_subscribe_subscribe_connection_allocations_invalid_scanner(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test bluetooth subscribe_connection_allocations for an invalid source address."""
entry = MockConfigEntry(domain=DOMAIN, unique_id="invalid")
entry.add_to_hass(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "bluetooth/subscribe_connection_allocations",
"config_entry_id": entry.entry_id,
}
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_source"
assert response["error"]["message"] == "Source invalid not found"