From e0ea5bfc518e57e0e830be765d595310282cf7a4 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 27 Jan 2025 11:49:49 -1000 Subject: [PATCH] Add Bluetooth WebSocket API to subscribe to connection allocations (#136215) --- homeassistant/components/bluetooth/util.py | 23 ++- .../components/bluetooth/websocket_api.py | 45 ++++- tests/components/bluetooth/__init__.py | 13 +- tests/components/bluetooth/conftest.py | 30 ++- tests/components/bluetooth/test_manager.py | 102 +++++++--- .../bluetooth/test_websocket_api.py | 174 +++++++++++++++++- 6 files changed, 345 insertions(+), 42 deletions(-) diff --git a/homeassistant/components/bluetooth/util.py b/homeassistant/components/bluetooth/util.py index 8c7ad13294a..ca2e0180c00 100644 --- a/homeassistant/components/bluetooth/util.py +++ b/homeassistant/components/bluetooth/util.py @@ -11,13 +11,23 @@ from bluetooth_adapters import ( adapter_unique_name, ) from bluetooth_data_tools import monotonic_time_coarse +from habluetooth import get_manager -from homeassistant.core import callback +from homeassistant.core import HomeAssistant, callback +from homeassistant.exceptions import HomeAssistantError from .models import BluetoothServiceInfoBleak from .storage import BluetoothStorage +class InvalidConfigEntryID(HomeAssistantError): + """Invalid config entry id.""" + + +class InvalidSource(HomeAssistantError): + """Invalid source.""" + + @callback def async_load_history_from_system( adapters: BluetoothAdapters, storage: BluetoothStorage @@ -85,3 +95,14 @@ def adapter_title(adapter: str, details: AdapterDetails) -> str: model = details.get(ADAPTER_PRODUCT, "Unknown") manufacturer = details[ADAPTER_MANUFACTURER] or "Unknown" return f"{manufacturer} {model} ({unique_name})" + + +def config_entry_id_to_source(hass: HomeAssistant, config_entry_id: str) -> str: + """Convert a config entry id to a source.""" + if not (entry := hass.config_entries.async_get_entry(config_entry_id)): + raise InvalidConfigEntryID(f"Config entry {config_entry_id} not found") + source = entry.unique_id + assert source is not None + if not get_manager().async_scanner_by_source(source): + raise InvalidSource(f"Source {source} not found") + return source diff --git a/homeassistant/components/bluetooth/websocket_api.py b/homeassistant/components/bluetooth/websocket_api.py index 45445a7a00f..2829617d09e 100644 --- a/homeassistant/components/bluetooth/websocket_api.py +++ b/homeassistant/components/bluetooth/websocket_api.py @@ -7,7 +7,7 @@ from functools import lru_cache, partial import time from typing import Any -from habluetooth import BluetoothScanningMode +from habluetooth import BluetoothScanningMode, HaBluetoothSlotAllocations from home_assistant_bluetooth import BluetoothServiceInfoBleak import voluptuous as vol @@ -18,12 +18,14 @@ from homeassistant.helpers.json import json_bytes from .api import _get_manager, async_register_callback from .match import BluetoothCallbackMatcher from .models import BluetoothChange +from .util import InvalidConfigEntryID, InvalidSource, config_entry_id_to_source @callback def async_setup(hass: HomeAssistant) -> None: """Set up the bluetooth websocket API.""" websocket_api.async_register_command(hass, ws_subscribe_advertisements) + websocket_api.async_register_command(hass, ws_subscribe_connection_allocations) @lru_cache(maxsize=1024) @@ -135,6 +137,7 @@ class _AdvertisementSubscription: self._async_added((service_info,)) +@websocket_api.require_admin @websocket_api.websocket_command( { vol.Required("type"): "bluetooth/subscribe_advertisements", @@ -148,3 +151,43 @@ async def ws_subscribe_advertisements( _AdvertisementSubscription( hass, connection, msg["id"], BluetoothCallbackMatcher(connectable=False) ).async_start() + + +@websocket_api.require_admin +@websocket_api.websocket_command( + { + vol.Required("type"): "bluetooth/subscribe_connection_allocations", + vol.Optional("config_entry_id"): str, + } +) +@websocket_api.async_response +async def ws_subscribe_connection_allocations( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any] +) -> None: + """Handle subscribe advertisements websocket command.""" + ws_msg_id = msg["id"] + source: str | None = None + if config_entry_id := msg.get("config_entry_id"): + try: + source = config_entry_id_to_source(hass, config_entry_id) + except InvalidConfigEntryID as err: + connection.send_error(ws_msg_id, "invalid_config_entry_id", str(err)) + return + except InvalidSource as err: + connection.send_error(ws_msg_id, "invalid_source", str(err)) + return + + def _async_allocations_changed(allocations: HaBluetoothSlotAllocations) -> None: + connection.send_message( + json_bytes(websocket_api.event_message(ws_msg_id, [allocations])) + ) + + manager = _get_manager(hass) + connection.subscriptions[ws_msg_id] = manager.async_register_allocation_callback( + _async_allocations_changed, source + ) + connection.send_message(json_bytes(websocket_api.result_message(ws_msg_id))) + if current_allocations := manager.async_current_allocations(source): + connection.send_message( + json_bytes(websocket_api.event_message(ws_msg_id, current_allocations)) + ) diff --git a/tests/components/bluetooth/__init__.py b/tests/components/bluetooth/__init__.py index c672de7424b..31d301e2dac 100644 --- a/tests/components/bluetooth/__init__.py +++ b/tests/components/bluetooth/__init__.py @@ -10,7 +10,7 @@ from unittest.mock import MagicMock, patch from bleak import BleakClient from bleak.backends.scanner import AdvertisementData, BLEDevice from bluetooth_adapters import DEFAULT_ADDRESS -from habluetooth import BaseHaScanner, BluetoothManager, get_manager +from habluetooth import BaseHaScanner, get_manager from homeassistant.components.bluetooth import ( DOMAIN, @@ -21,6 +21,7 @@ from homeassistant.components.bluetooth import ( BluetoothServiceInfoBleak, async_get_advertisement_callback, ) +from homeassistant.components.bluetooth.manager import HomeAssistantBluetoothManager from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component @@ -57,6 +58,11 @@ BLE_DEVICE_DEFAULTS = { } +HCI0_SOURCE_ADDRESS = "AA:BB:CC:DD:EE:00" +HCI1_SOURCE_ADDRESS = "AA:BB:CC:DD:EE:11" +NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS = "AA:BB:CC:DD:EE:FF" + + @contextmanager def patch_bluetooth_time(mock_time: float) -> None: """Patch the bluetooth time.""" @@ -101,9 +107,10 @@ def generate_ble_device( return BLEDevice(**new) -def _get_manager() -> BluetoothManager: +def _get_manager() -> HomeAssistantBluetoothManager: """Return the bluetooth manager.""" - return get_manager() + manager: HomeAssistantBluetoothManager = get_manager() + return manager def inject_advertisement( diff --git a/tests/components/bluetooth/conftest.py b/tests/components/bluetooth/conftest.py index 6fa0b375e81..e07b580acb2 100644 --- a/tests/components/bluetooth/conftest.py +++ b/tests/components/bluetooth/conftest.py @@ -5,13 +5,19 @@ from unittest.mock import patch from bleak_retry_connector import bleak_manager from dbus_fast.aio import message_bus +from habluetooth import BaseHaRemoteScanner import habluetooth.util as habluetooth_utils import pytest from homeassistant.components import bluetooth from homeassistant.core import HomeAssistant -from . import FakeScanner +from . import ( + HCI0_SOURCE_ADDRESS, + HCI1_SOURCE_ADDRESS, + NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS, + FakeScanner, +) @pytest.fixture(name="disable_bluez_manager_socket", autouse=True, scope="package") @@ -314,8 +320,9 @@ def disable_new_discovery_flows_fixture(): @pytest.fixture def register_hci0_scanner(hass: HomeAssistant) -> Generator[None]: """Register an hci0 scanner.""" - hci0_scanner = FakeScanner("hci0", "hci0") - cancel = bluetooth.async_register_scanner(hass, hci0_scanner) + hci0_scanner = FakeScanner(HCI0_SOURCE_ADDRESS, "hci0") + hci0_scanner.connectable = True + cancel = bluetooth.async_register_scanner(hass, hci0_scanner, connection_slots=5) yield cancel() bluetooth.async_remove_scanner(hass, hci0_scanner.source) @@ -324,8 +331,21 @@ def register_hci0_scanner(hass: HomeAssistant) -> Generator[None]: @pytest.fixture def register_hci1_scanner(hass: HomeAssistant) -> Generator[None]: """Register an hci1 scanner.""" - hci1_scanner = FakeScanner("hci1", "hci1") - cancel = bluetooth.async_register_scanner(hass, hci1_scanner) + hci1_scanner = FakeScanner(HCI1_SOURCE_ADDRESS, "hci1") + hci1_scanner.connectable = True + cancel = bluetooth.async_register_scanner(hass, hci1_scanner, connection_slots=5) yield cancel() bluetooth.async_remove_scanner(hass, hci1_scanner.source) + + +@pytest.fixture +def register_non_connectable_scanner(hass: HomeAssistant) -> Generator[None]: + """Register an non connectable remote scanner.""" + remote_scanner = BaseHaRemoteScanner( + NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS, "non connectable", None, False + ) + cancel = bluetooth.async_register_scanner(hass, remote_scanner) + yield + cancel() + bluetooth.async_remove_scanner(hass, remote_scanner.source) diff --git a/tests/components/bluetooth/test_manager.py b/tests/components/bluetooth/test_manager.py index 77071368dd0..c7fc80ba068 100644 --- a/tests/components/bluetooth/test_manager.py +++ b/tests/components/bluetooth/test_manager.py @@ -45,6 +45,8 @@ from homeassistant.util.dt import utcnow from homeassistant.util.json import json_loads from . import ( + HCI0_SOURCE_ADDRESS, + HCI1_SOURCE_ADDRESS, FakeScanner, MockBleakClient, _get_manager, @@ -82,7 +84,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason( local_name="wohand_signal_100", service_uuids=[] ) inject_advertisement_with_source( - hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci0" + hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI0_SOURCE_ADDRESS ) assert ( @@ -97,7 +99,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason( local_name="wohand_signal_99", service_uuids=[] ) inject_advertisement_with_source( - hass, switchbot_device_signal_99, switchbot_adv_signal_99, "hci0" + hass, switchbot_device_signal_99, switchbot_adv_signal_99, HCI0_SOURCE_ADDRESS ) assert ( @@ -112,7 +114,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason( local_name="wohand_good_signal", service_uuids=[] ) inject_advertisement_with_source( - hass, switchbot_device_signal_98, switchbot_adv_signal_98, "hci1" + hass, switchbot_device_signal_98, switchbot_adv_signal_98, HCI1_SOURCE_ADDRESS ) # should not switch to hci1 @@ -137,7 +139,10 @@ async def test_switching_adapters_based_on_rssi( local_name="wohand_poor_signal", service_uuids=[], rssi=-100 ) inject_advertisement_with_source( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + HCI0_SOURCE_ADDRESS, ) assert ( @@ -150,7 +155,10 @@ async def test_switching_adapters_based_on_rssi( local_name="wohand_good_signal", service_uuids=[], rssi=-60 ) inject_advertisement_with_source( - hass, switchbot_device_good_signal, switchbot_adv_good_signal, "hci1" + hass, + switchbot_device_good_signal, + switchbot_adv_good_signal, + HCI1_SOURCE_ADDRESS, ) assert ( @@ -159,7 +167,10 @@ async def test_switching_adapters_based_on_rssi( ) inject_advertisement_with_source( - hass, switchbot_device_good_signal, switchbot_adv_poor_signal, "hci0" + hass, + switchbot_device_good_signal, + switchbot_adv_poor_signal, + HCI0_SOURCE_ADDRESS, ) assert ( bluetooth.async_ble_device_from_address(hass, address) @@ -175,7 +186,10 @@ async def test_switching_adapters_based_on_rssi( ) inject_advertisement_with_source( - hass, switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0" + hass, + switchbot_device_similar_signal, + switchbot_adv_similar_signal, + HCI0_SOURCE_ADDRESS, ) assert ( bluetooth.async_ble_device_from_address(hass, address) @@ -198,7 +212,7 @@ async def test_switching_adapters_based_on_zero_rssi( local_name="wohand_no_rssi", service_uuids=[], rssi=0 ) inject_advertisement_with_source( - hass, switchbot_device_no_rssi, switchbot_adv_no_rssi, "hci0" + hass, switchbot_device_no_rssi, switchbot_adv_no_rssi, HCI0_SOURCE_ADDRESS ) assert ( @@ -211,7 +225,10 @@ async def test_switching_adapters_based_on_zero_rssi( local_name="wohand_good_signal", service_uuids=[], rssi=-60 ) inject_advertisement_with_source( - hass, switchbot_device_good_signal, switchbot_adv_good_signal, "hci1" + hass, + switchbot_device_good_signal, + switchbot_adv_good_signal, + HCI1_SOURCE_ADDRESS, ) assert ( @@ -220,7 +237,7 @@ async def test_switching_adapters_based_on_zero_rssi( ) inject_advertisement_with_source( - hass, switchbot_device_good_signal, switchbot_adv_no_rssi, "hci0" + hass, switchbot_device_good_signal, switchbot_adv_no_rssi, HCI0_SOURCE_ADDRESS ) assert ( bluetooth.async_ble_device_from_address(hass, address) @@ -236,7 +253,10 @@ async def test_switching_adapters_based_on_zero_rssi( ) inject_advertisement_with_source( - hass, switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0" + hass, + switchbot_device_similar_signal, + switchbot_adv_similar_signal, + HCI0_SOURCE_ADDRESS, ) assert ( bluetooth.async_ble_device_from_address(hass, address) @@ -266,7 +286,7 @@ async def test_switching_adapters_based_on_stale( switchbot_device_poor_signal_hci0, switchbot_adv_poor_signal_hci0, start_time_monotonic, - "hci0", + HCI0_SOURCE_ADDRESS, ) assert ( @@ -285,7 +305,7 @@ async def test_switching_adapters_based_on_stale( switchbot_device_poor_signal_hci1, switchbot_adv_poor_signal_hci1, start_time_monotonic, - "hci1", + HCI1_SOURCE_ADDRESS, ) # Should not switch adapters until the advertisement is stale @@ -333,7 +353,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval( switchbot_device_poor_signal_hci0, switchbot_adv_poor_signal_hci0, start_time_monotonic, - "hci0", + HCI0_SOURCE_ADDRESS, ) assert ( @@ -354,7 +374,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval( switchbot_device_poor_signal_hci1, switchbot_adv_poor_signal_hci1, start_time_monotonic, - "hci1", + HCI1_SOURCE_ADDRESS, ) # Should not switch adapters until the advertisement is stale @@ -368,7 +388,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval( switchbot_device_poor_signal_hci1, switchbot_adv_poor_signal_hci1, start_time_monotonic + 10 + 1, - "hci1", + HCI1_SOURCE_ADDRESS, ) # Should not switch yet since we are not within the @@ -383,7 +403,7 @@ async def test_switching_adapters_based_on_stale_with_discovered_interval( switchbot_device_poor_signal_hci1, switchbot_adv_poor_signal_hci1, start_time_monotonic + 10 + TRACKER_BUFFERING_WOBBLE_SECONDS + 1, - "hci1", + HCI1_SOURCE_ADDRESS, ) # Should switch to hci1 since the previous advertisement is stale # even though the signal is poor because the device is now @@ -404,7 +424,9 @@ async def test_restore_history_from_dbus( ble_device = generate_ble_device(address, "name") history = { address: AdvertisementHistory( - ble_device, generate_advertisement_data(local_name="name"), "hci0" + ble_device, + generate_advertisement_data(local_name="name"), + HCI0_SOURCE_ADDRESS, ) } @@ -440,7 +462,9 @@ async def test_restore_history_from_dbus_and_remote_adapters( ble_device = generate_ble_device(address, "name") history = { address: AdvertisementHistory( - ble_device, generate_advertisement_data(local_name="name"), "hci0" + ble_device, + generate_advertisement_data(local_name="name"), + HCI0_SOURCE_ADDRESS, ) } @@ -480,7 +504,9 @@ async def test_restore_history_from_dbus_and_corrupted_remote_adapters( ble_device = generate_ble_device(address, "name") history = { address: AdvertisementHistory( - ble_device, generate_advertisement_data(local_name="name"), "hci0" + ble_device, + generate_advertisement_data(local_name="name"), + HCI0_SOURCE_ADDRESS, ) } @@ -511,7 +537,12 @@ async def test_switching_adapters_based_on_rssi_connectable_to_non_connectable( local_name="wohand_poor_signal", service_uuids=[], rssi=-100 ) inject_advertisement_with_time_and_source_connectable( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, now, "hci0", True + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + now, + HCI0_SOURCE_ADDRESS, + True, ) assert ( @@ -607,7 +638,7 @@ async def test_connectable_advertisement_can_be_retrieved_with_best_path_is_non_ switchbot_device_good_signal, switchbot_adv_good_signal, now, - "hci1", + HCI1_SOURCE_ADDRESS, False, ) @@ -622,7 +653,12 @@ async def test_connectable_advertisement_can_be_retrieved_with_best_path_is_non_ local_name="wohand_poor_signal", service_uuids=[], rssi=-100 ) inject_advertisement_with_time_and_source_connectable( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, now, "hci0", True + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + now, + HCI0_SOURCE_ADDRESS, + True, ) assert ( @@ -662,7 +698,10 @@ async def test_switching_adapters_when_one_goes_away( local_name="wohand_poor_signal", service_uuids=[], rssi=-100 ) inject_advertisement_with_source( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + HCI0_SOURCE_ADDRESS, ) # We want to prefer the good signal when we have options @@ -674,7 +713,10 @@ async def test_switching_adapters_when_one_goes_away( cancel_hci2() inject_advertisement_with_source( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + HCI0_SOURCE_ADDRESS, ) # Now that hci2 is gone, we should prefer the poor signal @@ -713,7 +755,10 @@ async def test_switching_adapters_when_one_stop_scanning( local_name="wohand_poor_signal", service_uuids=[], rssi=-100 ) inject_advertisement_with_source( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + HCI0_SOURCE_ADDRESS, ) # We want to prefer the good signal when we have options @@ -725,7 +770,10 @@ async def test_switching_adapters_when_one_stop_scanning( hci2_scanner.scanning = False inject_advertisement_with_source( - hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0" + hass, + switchbot_device_poor_signal, + switchbot_adv_poor_signal, + HCI0_SOURCE_ADDRESS, ) # Now that hci2 has stopped scanning, we should prefer the poor signal diff --git a/tests/components/bluetooth/test_websocket_api.py b/tests/components/bluetooth/test_websocket_api.py index c9670f2f895..d9289fe8380 100644 --- a/tests/components/bluetooth/test_websocket_api.py +++ b/tests/components/bluetooth/test_websocket_api.py @@ -5,19 +5,25 @@ from datetime import timedelta import time from unittest.mock import ANY, patch +from bleak_retry_connector import Allocations from freezegun import freeze_time import pytest +from homeassistant.components.bluetooth import DOMAIN from homeassistant.core import HomeAssistant from homeassistant.util.dt import utcnow from . import ( + HCI0_SOURCE_ADDRESS, + HCI1_SOURCE_ADDRESS, + NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS, + _get_manager, generate_advertisement_data, generate_ble_device, inject_advertisement_with_source, ) -from tests.common import async_fire_time_changed +from tests.common import MockConfigEntry, async_fire_time_changed from tests.typing import WebSocketGenerator @@ -38,7 +44,7 @@ async def test_subscribe_advertisements( local_name="wohand_signal_100", service_uuids=[] ) inject_advertisement_with_source( - hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci0" + hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI0_SOURCE_ADDRESS ) client = await hass_ws_client() @@ -64,7 +70,7 @@ async def test_subscribe_advertisements( "rssi": -127, "service_data": {}, "service_uuids": [], - "source": "hci0", + "source": HCI0_SOURCE_ADDRESS, "time": ANY, "tx_power": -127, } @@ -79,7 +85,7 @@ async def test_subscribe_advertisements( rssi=-80, ) inject_advertisement_with_source( - hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci1" + hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI1_SOURCE_ADDRESS ) async with asyncio.timeout(1): response = await client.receive_json() @@ -93,7 +99,7 @@ async def test_subscribe_advertisements( "rssi": -80, "service_data": {}, "service_uuids": [], - "source": "hci1", + "source": HCI1_SOURCE_ADDRESS, "time": ANY, "tx_power": -127, } @@ -114,3 +120,161 @@ async def test_subscribe_advertisements( async with asyncio.timeout(1): response = await client.receive_json() assert response["event"] == {"remove": [{"address": "44:44:33:11:23:12"}]} + + +@pytest.mark.usefixtures("enable_bluetooth") +async def test_subscribe_subscribe_connection_allocations( + hass: HomeAssistant, + register_hci0_scanner: None, + register_hci1_scanner: None, + register_non_connectable_scanner: None, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test bluetooth subscribe_connection_allocations.""" + address = "44:44:33:11:23:12" + + switchbot_device_signal_100 = generate_ble_device( + address, "wohand_signal_100", rssi=-100 + ) + switchbot_adv_signal_100 = generate_advertisement_data( + local_name="wohand_signal_100", service_uuids=[] + ) + inject_advertisement_with_source( + hass, switchbot_device_signal_100, switchbot_adv_signal_100, HCI0_SOURCE_ADDRESS + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "bluetooth/subscribe_connection_allocations", + } + ) + async with asyncio.timeout(1): + response = await client.receive_json() + assert response["success"] + + async with asyncio.timeout(1): + response = await client.receive_json() + + assert response["event"] == [ + { + "allocated": [], + "free": 0, + "slots": 0, + "source": NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS, + } + ] + + manager = _get_manager() + manager.async_on_allocation_changed( + Allocations( + adapter="hci1", # Will be translated to source + slots=5, + free=4, + allocated=["AA:BB:CC:DD:EE:EE"], + ) + ) + async with asyncio.timeout(1): + response = await client.receive_json() + assert response["event"] == [ + { + "allocated": ["AA:BB:CC:DD:EE:EE"], + "free": 4, + "slots": 5, + "source": "AA:BB:CC:DD:EE:11", + } + ] + manager.async_on_allocation_changed( + Allocations( + adapter="hci1", # Will be translated to source + slots=5, + free=5, + allocated=[], + ) + ) + async with asyncio.timeout(1): + response = await client.receive_json() + assert response["event"] == [ + {"allocated": [], "free": 5, "slots": 5, "source": HCI1_SOURCE_ADDRESS} + ] + + +@pytest.mark.usefixtures("enable_bluetooth") +async def test_subscribe_subscribe_connection_allocations_specific_scanner( + hass: HomeAssistant, + register_non_connectable_scanner: None, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test bluetooth subscribe_connection_allocations for a specific source address.""" + entry = MockConfigEntry( + domain=DOMAIN, unique_id=NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS + ) + entry.add_to_hass(hass) + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "bluetooth/subscribe_connection_allocations", + "config_entry_id": entry.entry_id, + } + ) + async with asyncio.timeout(1): + response = await client.receive_json() + assert response["success"] + + async with asyncio.timeout(1): + response = await client.receive_json() + + assert response["event"] == [ + { + "allocated": [], + "free": 0, + "slots": 0, + "source": NON_CONNECTABLE_REMOTE_SOURCE_ADDRESS, + } + ] + + +@pytest.mark.usefixtures("enable_bluetooth") +async def test_subscribe_subscribe_connection_allocations_invalid_config_entry_id( + hass: HomeAssistant, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test bluetooth subscribe_connection_allocations for an invalid config entry id.""" + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "bluetooth/subscribe_connection_allocations", + "config_entry_id": "non_existent", + } + ) + async with asyncio.timeout(1): + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_config_entry_id" + assert response["error"]["message"] == "Config entry non_existent not found" + + +@pytest.mark.usefixtures("enable_bluetooth") +async def test_subscribe_subscribe_connection_allocations_invalid_scanner( + hass: HomeAssistant, + hass_ws_client: WebSocketGenerator, +) -> None: + """Test bluetooth subscribe_connection_allocations for an invalid source address.""" + entry = MockConfigEntry(domain=DOMAIN, unique_id="invalid") + entry.add_to_hass(hass) + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "bluetooth/subscribe_connection_allocations", + "config_entry_id": entry.entry_id, + } + ) + async with asyncio.timeout(1): + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_source" + assert response["error"]["message"] == "Source invalid not found"