diff --git a/homeassistant/components/bluetooth/__init__.py b/homeassistant/components/bluetooth/__init__.py index ea058a72759..8a1897c5ec2 100644 --- a/homeassistant/components/bluetooth/__init__.py +++ b/homeassistant/components/bluetooth/__init__.py @@ -2,11 +2,12 @@ from __future__ import annotations from collections.abc import Callable +from dataclasses import dataclass from enum import Enum import fnmatch import logging import platform -from typing import Final, TypedDict +from typing import Final, TypedDict, Union from bleak import BleakError from bleak.backends.device import BLEDevice @@ -42,6 +43,37 @@ MAX_REMEMBER_ADDRESSES: Final = 2048 SOURCE_LOCAL: Final = "local" +@dataclass +class BluetoothServiceInfoBleak(BluetoothServiceInfo): # type: ignore[misc] + """BluetoothServiceInfo with bleak data. + + Integrations may need BLEDevice and AdvertisementData + to connect to the device without having bleak trigger + another scan to translate the address to the system's + internal details. + """ + + device: BLEDevice + advertisement: AdvertisementData + + @classmethod + def from_advertisement( + cls, device: BLEDevice, advertisement_data: AdvertisementData, source: str + ) -> BluetoothServiceInfo: + """Create a BluetoothServiceInfoBleak from an advertisement.""" + return cls( + name=advertisement_data.local_name or device.name or device.address, + address=device.address, + rssi=device.rssi, + manufacturer_data=advertisement_data.manufacturer_data, + service_data=advertisement_data.service_data, + service_uuids=advertisement_data.service_uuids, + source=source, + device=device, + advertisement=advertisement_data, + ) + + class BluetoothCallbackMatcherOptional(TypedDict, total=False): """Matcher for the bluetooth integration for callback optional fields.""" @@ -75,13 +107,15 @@ MANUFACTURER_DATA_START: Final = "manufacturer_data_start" BluetoothChange = Enum("BluetoothChange", "ADVERTISEMENT") -BluetoothCallback = Callable[[BluetoothServiceInfo, BluetoothChange], None] +BluetoothCallback = Callable[ + [Union[BluetoothServiceInfoBleak, BluetoothServiceInfo], BluetoothChange], None +] @hass_callback def async_discovered_service_info( hass: HomeAssistant, -) -> list[BluetoothServiceInfo]: +) -> list[BluetoothServiceInfoBleak]: """Return the discovered devices list.""" if DOMAIN not in hass.data: return [] @@ -89,6 +123,18 @@ def async_discovered_service_info( return manager.async_discovered_service_info() +@hass_callback +def async_ble_device_from_address( + hass: HomeAssistant, + address: str, +) -> BLEDevice | None: + """Return BLEDevice for an address if its present.""" + if DOMAIN not in hass.data: + return None + manager: BluetoothManager = hass.data[DOMAIN] + return manager.async_ble_device_from_address(address) + + @hass_callback def async_address_present( hass: HomeAssistant, @@ -263,13 +309,13 @@ class BluetoothManager: if not matched_domains and not self._callbacks: return - service_info: BluetoothServiceInfo | None = None + service_info: BluetoothServiceInfoBleak | None = None for callback, matcher in self._callbacks: if matcher is None or _ble_device_matches( matcher, device, advertisement_data ): if service_info is None: - service_info = BluetoothServiceInfo.from_advertisement( + service_info = BluetoothServiceInfoBleak.from_advertisement( device, advertisement_data, SOURCE_LOCAL ) try: @@ -280,7 +326,7 @@ class BluetoothManager: if not matched_domains: return if service_info is None: - service_info = BluetoothServiceInfo.from_advertisement( + service_info = BluetoothServiceInfoBleak.from_advertisement( device, advertisement_data, SOURCE_LOCAL ) for domain in matched_domains: @@ -316,7 +362,7 @@ class BluetoothManager: ): try: callback( - BluetoothServiceInfo.from_advertisement( + BluetoothServiceInfoBleak.from_advertisement( *device_adv_data, SOURCE_LOCAL ), BluetoothChange.ADVERTISEMENT, @@ -326,6 +372,15 @@ class BluetoothManager: return _async_remove_callback + @hass_callback + def async_ble_device_from_address(self, address: str) -> BLEDevice | None: + """Return the BLEDevice if present.""" + if models.HA_BLEAK_SCANNER and ( + ble_adv := models.HA_BLEAK_SCANNER.history.get(address) + ): + return ble_adv[0] + return None + @hass_callback def async_address_present(self, address: str) -> bool: """Return if the address is present.""" @@ -344,7 +399,7 @@ class BluetoothManager: discovered = models.HA_BLEAK_SCANNER.discovered_devices history = models.HA_BLEAK_SCANNER.history return [ - BluetoothServiceInfo.from_advertisement( + BluetoothServiceInfoBleak.from_advertisement( *history[device.address], SOURCE_LOCAL ) for device in discovered diff --git a/homeassistant/components/bluetooth/models.py b/homeassistant/components/bluetooth/models.py index eda94305aa9..fd7559aeefa 100644 --- a/homeassistant/components/bluetooth/models.py +++ b/homeassistant/components/bluetooth/models.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio +from collections.abc import Mapping import contextlib import logging from typing import Any, Final, cast @@ -56,7 +57,9 @@ class HaBleakScanner(BleakScanner): # type: ignore[misc] self._callbacks: list[ tuple[AdvertisementDataCallback, dict[str, set[str]]] ] = [] - self.history: LRU = LRU(MAX_HISTORY_SIZE) + self.history: Mapping[str, tuple[BLEDevice, AdvertisementData]] = LRU( + MAX_HISTORY_SIZE + ) super().__init__(*args, **kwargs) @hass_callback @@ -87,7 +90,7 @@ class HaBleakScanner(BleakScanner): # type: ignore[misc] Here we get the actual callback from bleak and dispatch it to all the wrapped HaBleakScannerWrapper classes """ - self.history[device.address] = (device, advertisement_data) + self.history[device.address] = (device, advertisement_data) # type: ignore[index] for callback_filters in self._callbacks: _dispatch_callback(*callback_filters, device, advertisement_data) diff --git a/tests/components/bluetooth/test_init.py b/tests/components/bluetooth/test_init.py index a31f911f6a7..1a002e5e354 100644 --- a/tests/components/bluetooth/test_init.py +++ b/tests/components/bluetooth/test_init.py @@ -248,6 +248,8 @@ async def test_async_discovered_device_api(hass, mock_bleak_scanner_start): # wrong_name should not appear because bleak no longer sees it assert service_infos[0].name == "wohand" assert service_infos[0].source == SOURCE_LOCAL + assert isinstance(service_infos[0].device, BLEDevice) + assert isinstance(service_infos[0].advertisement, AdvertisementData) assert bluetooth.async_address_present(hass, "44:44:33:11:23:42") is False assert bluetooth.async_address_present(hass, "44:44:33:11:23:45") is True @@ -694,3 +696,43 @@ async def test_wrapped_instance_unsupported_filter( } ) assert "Only UUIDs filters are supported" in caplog.text + + +async def test_async_ble_device_from_address(hass, mock_bleak_scanner_start): + """Test the async_ble_device_from_address api.""" + mock_bt = [] + with patch( + "homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt + ), patch( + "bleak.BleakScanner.discovered_devices", # Must patch before we setup + [MagicMock(address="44:44:33:11:23:45")], + ): + assert not bluetooth.async_discovered_service_info(hass) + assert not bluetooth.async_address_present(hass, "44:44:22:22:11:22") + assert ( + bluetooth.async_ble_device_from_address(hass, "44:44:33:11:23:45") is None + ) + + assert await async_setup_component( + hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}} + ) + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await hass.async_block_till_done() + + assert len(mock_bleak_scanner_start.mock_calls) == 1 + + assert not bluetooth.async_discovered_service_info(hass) + + switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand") + switchbot_adv = AdvertisementData(local_name="wohand", service_uuids=[]) + models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv) + await hass.async_block_till_done() + + assert ( + bluetooth.async_ble_device_from_address(hass, "44:44:33:11:23:45") + is switchbot_device + ) + + assert ( + bluetooth.async_ble_device_from_address(hass, "00:66:33:22:11:22") is None + )