Add support for scanners that do not provide connectable devices (#77132)

This commit is contained in:
J. Nick Koston 2022-08-22 08:02:26 -10:00 committed by GitHub
parent 61ff1b786b
commit 3938015c93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1088 additions and 385 deletions

View File

@ -2,15 +2,15 @@
from __future__ import annotations
from asyncio import Future
from collections.abc import Callable
from collections.abc import Callable, Iterable
import platform
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast
import async_timeout
from homeassistant import config_entries
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant, callback as hass_callback
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr, discovery_flow
from homeassistant.loader import async_get_bluetooth
@ -31,6 +31,7 @@ from .const import (
from .manager import BluetoothManager
from .match import BluetoothCallbackMatcher, IntegrationMatcher
from .models import (
BaseHaScanner,
BluetoothCallback,
BluetoothChange,
BluetoothScanningMode,
@ -56,6 +57,7 @@ __all__ = [
"async_rediscover_address",
"async_register_callback",
"async_track_unavailable",
"BaseHaScanner",
"BluetoothServiceInfo",
"BluetoothServiceInfoBleak",
"BluetoothScanningMode",
@ -64,6 +66,11 @@ __all__ = [
]
def _get_manager(hass: HomeAssistant) -> BluetoothManager:
"""Get the bluetooth manager."""
return cast(BluetoothManager, hass.data[DATA_MANAGER])
@hass_callback
def async_get_scanner(hass: HomeAssistant) -> HaBleakScannerWrapper:
"""Return a HaBleakScannerWrapper.
@ -76,37 +83,32 @@ def async_get_scanner(hass: HomeAssistant) -> HaBleakScannerWrapper:
@hass_callback
def async_discovered_service_info(
hass: HomeAssistant,
) -> list[BluetoothServiceInfoBleak]:
hass: HomeAssistant, connectable: bool = True
) -> Iterable[BluetoothServiceInfoBleak]:
"""Return the discovered devices list."""
if DATA_MANAGER not in hass.data:
return []
manager: BluetoothManager = hass.data[DATA_MANAGER]
return manager.async_discovered_service_info()
return _get_manager(hass).async_discovered_service_info(connectable)
@hass_callback
def async_ble_device_from_address(
hass: HomeAssistant,
address: str,
hass: HomeAssistant, address: str, connectable: bool = True
) -> BLEDevice | None:
"""Return BLEDevice for an address if its present."""
if DATA_MANAGER not in hass.data:
return None
manager: BluetoothManager = hass.data[DATA_MANAGER]
return manager.async_ble_device_from_address(address)
return _get_manager(hass).async_ble_device_from_address(address, connectable)
@hass_callback
def async_address_present(
hass: HomeAssistant,
address: str,
hass: HomeAssistant, address: str, connectable: bool = True
) -> bool:
"""Check if an address is present in the bluetooth device list."""
if DATA_MANAGER not in hass.data:
return False
manager: BluetoothManager = hass.data[DATA_MANAGER]
return manager.async_address_present(address)
return _get_manager(hass).async_address_present(address, connectable)
@hass_callback
@ -125,8 +127,7 @@ def async_register_callback(
Returns a callback that can be used to cancel the registration.
"""
manager: BluetoothManager = hass.data[DATA_MANAGER]
return manager.async_register_callback(callback, match_dict)
return _get_manager(hass).async_register_callback(callback, match_dict)
async def async_process_advertisements(
@ -146,7 +147,9 @@ async def async_process_advertisements(
if not done.done() and callback(service_info):
done.set_result(service_info)
unload = async_register_callback(hass, _async_discovered_device, match_dict, mode)
unload = _get_manager(hass).async_register_callback(
_async_discovered_device, match_dict
)
try:
async with async_timeout.timeout(timeout):
@ -160,26 +163,47 @@ def async_track_unavailable(
hass: HomeAssistant,
callback: Callable[[str], None],
address: str,
connectable: bool = True,
) -> Callable[[], None]:
"""Register to receive a callback when an address is unavailable.
Returns a callback that can be used to cancel the registration.
"""
manager: BluetoothManager = hass.data[DATA_MANAGER]
return manager.async_track_unavailable(callback, address)
return _get_manager(hass).async_track_unavailable(callback, address, connectable)
@hass_callback
def async_rediscover_address(hass: HomeAssistant, address: str) -> None:
"""Trigger discovery of devices which have already been seen."""
manager: BluetoothManager = hass.data[DATA_MANAGER]
manager.async_rediscover_address(address)
_get_manager(hass).async_rediscover_address(address)
@hass_callback
def async_register_scanner(
hass: HomeAssistant, scanner: BaseHaScanner, connectable: bool
) -> CALLBACK_TYPE:
"""Register a BleakScanner."""
return _get_manager(hass).async_register_scanner(scanner, connectable)
@hass_callback
def async_get_advertisement_callback(
hass: HomeAssistant,
) -> Callable[[BluetoothServiceInfoBleak], None]:
"""Get the advertisement callback."""
return _get_manager(hass).scanner_adv_received
async def async_get_adapter_from_address(
hass: HomeAssistant, address: str
) -> str | None:
"""Get an adapter by the address."""
return await _get_manager(hass).async_get_adapter_from_address(address)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the bluetooth integration."""
integration_matcher = IntegrationMatcher(await async_get_bluetooth(hass))
manager = BluetoothManager(hass, integration_matcher)
manager.async_setup()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, manager.async_stop)
@ -236,10 +260,9 @@ async def async_discover_adapters(
async def async_update_device(
hass: HomeAssistant,
entry: config_entries.ConfigEntry,
manager: BluetoothManager,
adapter: str,
address: str,
) -> None:
"""Update device registry entry.
@ -248,6 +271,7 @@ async def async_update_device(
update the device with the new location so they can
figure out where the adapter is.
"""
manager: BluetoothManager = hass.data[DATA_MANAGER]
adapters = await manager.async_get_bluetooth_adapters()
details = adapters[adapter]
registry = dr.async_get(manager.hass)
@ -264,10 +288,9 @@ async def async_setup_entry(
hass: HomeAssistant, entry: config_entries.ConfigEntry
) -> bool:
"""Set up a config entry for a bluetooth scanner."""
manager: BluetoothManager = hass.data[DATA_MANAGER]
address = entry.unique_id
assert address is not None
adapter = await manager.async_get_adapter_from_address(address)
adapter = await async_get_adapter_from_address(hass, address)
if adapter is None:
raise ConfigEntryNotReady(
f"Bluetooth adapter {adapter} with address {address} not found"
@ -280,13 +303,14 @@ async def async_setup_entry(
f"{adapter_human_name(adapter, address)}: {err}"
) from err
scanner = HaScanner(hass, bleak_scanner, adapter, address)
entry.async_on_unload(scanner.async_register_callback(manager.scanner_adv_received))
info_callback = async_get_advertisement_callback(hass)
entry.async_on_unload(scanner.async_register_callback(info_callback))
try:
await scanner.async_start()
except ScannerStartError as err:
raise ConfigEntryNotReady from err
entry.async_on_unload(manager.async_register_scanner(scanner))
await async_update_device(entry, manager, adapter, address)
entry.async_on_unload(async_register_scanner(hass, scanner, True))
await async_update_device(hass, entry, adapter)
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = scanner
return True

View File

@ -61,9 +61,10 @@ class ActiveBluetoothProcessorCoordinator(
]
| None = None,
poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None,
connectable: bool = True,
) -> None:
"""Initialize the processor."""
super().__init__(hass, logger, address, mode, update_method)
super().__init__(hass, logger, address, mode, update_method, connectable)
self._needs_poll_method = needs_poll_method
self._poll_method = poll_method

View File

@ -2,7 +2,6 @@
from __future__ import annotations
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from datetime import datetime, timedelta
import itertools
import logging
@ -22,18 +21,23 @@ from homeassistant.helpers.event import async_track_time_interval
from .const import (
ADAPTER_ADDRESS,
SOURCE_LOCAL,
STALE_ADVERTISEMENT_SECONDS,
UNAVAILABLE_TRACK_SECONDS,
AdapterDetails,
)
from .match import (
ADDRESS,
CONNECTABLE,
BluetoothCallbackMatcher,
IntegrationMatcher,
ble_device_matches,
)
from .models import BluetoothCallback, BluetoothChange, BluetoothServiceInfoBleak
from .models import (
BaseHaScanner,
BluetoothCallback,
BluetoothChange,
BluetoothServiceInfoBleak,
)
from .usage import install_multiple_bleak_catcher, uninstall_multiple_bleak_catcher
from .util import async_get_bluetooth_adapters
@ -41,7 +45,6 @@ if TYPE_CHECKING:
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from .scanner import HaScanner
FILTER_UUIDS: Final = "UUIDs"
@ -51,43 +54,39 @@ RSSI_SWITCH_THRESHOLD = 6
_LOGGER = logging.getLogger(__name__)
@dataclass
class AdvertisementHistory:
"""Bluetooth advertisement history."""
ble_device: BLEDevice
advertisement_data: AdvertisementData
time: float
source: str
def _prefer_previous_adv(old: AdvertisementHistory, new: AdvertisementHistory) -> bool:
def _prefer_previous_adv(
old: BluetoothServiceInfoBleak, new: BluetoothServiceInfoBleak
) -> bool:
"""Prefer previous advertisement if it is better."""
if new.time - old.time > STALE_ADVERTISEMENT_SECONDS:
# If the old advertisement is stale, any new advertisement is preferred
if new.source != old.source:
_LOGGER.debug(
"%s (%s): Switching from %s to %s (time_elapsed:%s > stale_seconds:%s)",
new.advertisement_data.local_name,
new.ble_device.address,
"%s (%s): Switching from %s[%s] to %s[%s] (time elapsed:%s > stale seconds:%s)",
new.advertisement.local_name,
new.device.address,
old.source,
old.connectable,
new.source,
new.connectable,
new.time - old.time,
STALE_ADVERTISEMENT_SECONDS,
)
return False
if new.ble_device.rssi - RSSI_SWITCH_THRESHOLD > old.ble_device.rssi:
if new.device.rssi - RSSI_SWITCH_THRESHOLD > old.device.rssi:
# If new advertisement is RSSI_SWITCH_THRESHOLD more, the new one is preferred
if new.source != old.source:
_LOGGER.debug(
"%s (%s): Switching from %s to %s (new_rssi:%s - threadshold:%s > old_rssi:%s)",
new.advertisement_data.local_name,
new.ble_device.address,
"%s (%s): Switching from %s[%s] to %s[%s] (new rssi:%s - threshold:%s > old rssi:%s)",
new.advertisement.local_name,
new.device.address,
old.source,
old.connectable,
new.source,
new.ble_device.rssi,
new.connectable,
new.device.rssi,
RSSI_SWITCH_THRESHOLD,
old.ble_device.rssi,
old.device.rssi,
)
return False
# If the source is the different, the old one is preferred because its
@ -128,16 +127,24 @@ class BluetoothManager:
"""Init bluetooth manager."""
self.hass = hass
self._integration_matcher = integration_matcher
self._cancel_unavailable_tracking: CALLBACK_TYPE | None = None
self._cancel_unavailable_tracking: list[CALLBACK_TYPE] = []
self._unavailable_callbacks: dict[str, list[Callable[[str], None]]] = {}
self._connectable_unavailable_callbacks: dict[
str, list[Callable[[str], None]]
] = {}
self._callbacks: list[
tuple[BluetoothCallback, BluetoothCallbackMatcher | None]
] = []
self._connectable_callbacks: list[
tuple[BluetoothCallback, BluetoothCallbackMatcher | None]
] = []
self._bleak_callbacks: list[
tuple[AdvertisementDataCallback, dict[str, set[str]]]
] = []
self.history: dict[str, AdvertisementHistory] = {}
self._scanners: list[HaScanner] = []
self._history: dict[str, BluetoothServiceInfoBleak] = {}
self._connectable_history: dict[str, BluetoothServiceInfoBleak] = {}
self._scanners: list[BaseHaScanner] = []
self._connectable_scanners: list[BaseHaScanner] = []
self._adapters: dict[str, AdapterDetails] = {}
def _find_adapter_by_address(self, address: str) -> str | None:
@ -146,9 +153,11 @@ class BluetoothManager:
return adapter
return None
async def async_get_bluetooth_adapters(self) -> dict[str, AdapterDetails]:
async def async_get_bluetooth_adapters(
self, cached: bool = True
) -> dict[str, AdapterDetails]:
"""Get bluetooth adapters."""
if not self._adapters:
if not cached or not self._adapters:
self._adapters = await async_get_bluetooth_adapters()
return self._adapters
@ -170,37 +179,51 @@ class BluetoothManager:
"""Stop the Bluetooth integration at shutdown."""
_LOGGER.debug("Stopping bluetooth manager")
if self._cancel_unavailable_tracking:
self._cancel_unavailable_tracking()
self._cancel_unavailable_tracking = None
for cancel in self._cancel_unavailable_tracking:
cancel()
self._cancel_unavailable_tracking.clear()
uninstall_multiple_bleak_catcher()
@hass_callback
def async_all_discovered_devices(self) -> Iterable[BLEDevice]:
def async_all_discovered_devices(self, connectable: bool) -> Iterable[BLEDevice]:
"""Return all of discovered devices from all the scanners including duplicates."""
return itertools.chain.from_iterable(
scanner.discovered_devices for scanner in self._scanners
scanner.discovered_devices
for scanner in self._get_scanners_by_type(connectable)
)
@hass_callback
def async_discovered_devices(self) -> list[BLEDevice]:
def async_discovered_devices(self, connectable: bool) -> list[BLEDevice]:
"""Return all of combined best path to discovered from all the scanners."""
return [history.ble_device for history in self.history.values()]
return [
history.device
for history in self._get_history_by_type(connectable).values()
]
@hass_callback
def async_setup_unavailable_tracking(self) -> None:
"""Set up the unavailable tracking."""
self._async_setup_unavailable_tracking(True)
self._async_setup_unavailable_tracking(False)
@hass_callback
def _async_setup_unavailable_tracking(self, connectable: bool) -> None:
"""Set up the unavailable tracking."""
unavailable_callbacks = self._get_unavailable_callbacks_by_type(connectable)
history = self._get_history_by_type(connectable)
@hass_callback
def _async_check_unavailable(now: datetime) -> None:
"""Watch for unavailable devices."""
history_set = set(self.history)
history_set = set(history)
active_addresses = {
device.address for device in self.async_all_discovered_devices()
device.address
for device in self.async_all_discovered_devices(connectable)
}
disappeared = history_set.difference(active_addresses)
for address in disappeared:
del self.history[address]
if not (callbacks := self._unavailable_callbacks.get(address)):
del history[address]
if not (callbacks := unavailable_callbacks.get(address)):
continue
for callback in callbacks:
try:
@ -208,20 +231,16 @@ class BluetoothManager:
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in unavailable callback")
self._cancel_unavailable_tracking = async_track_time_interval(
self.hass,
_async_check_unavailable,
timedelta(seconds=UNAVAILABLE_TRACK_SECONDS),
self._cancel_unavailable_tracking.append(
async_track_time_interval(
self.hass,
_async_check_unavailable,
timedelta(seconds=UNAVAILABLE_TRACK_SECONDS),
)
)
@hass_callback
def scanner_adv_received(
self,
device: BLEDevice,
advertisement_data: AdvertisementData,
monotonic_time: float,
source: str,
) -> None:
def scanner_adv_received(self, service_info: BluetoothServiceInfoBleak) -> None:
"""Handle a new advertisement from any scanner.
Callbacks from all the scanners arrive here.
@ -233,42 +252,46 @@ class BluetoothManager:
than the source from the history or the timestamp
in the history is older than 180s
"""
new_history = AdvertisementHistory(
device, advertisement_data, monotonic_time, source
)
if (old_history := self.history.get(device.address)) and _prefer_previous_adv(
old_history, new_history
):
device = service_info.device
connectable = service_info.connectable
address = device.address
all_history = self._get_history_by_type(connectable)
old_service_info = all_history.get(address)
if old_service_info and _prefer_previous_adv(old_service_info, service_info):
return
self.history[device.address] = new_history
self._history[address] = service_info
advertisement_data = service_info.advertisement
source = service_info.source
for callback_filters in self._bleak_callbacks:
_dispatch_bleak_callback(*callback_filters, device, advertisement_data)
if connectable:
self._connectable_history[address] = service_info
# Bleak callbacks must get a connectable device
matched_domains = self._integration_matcher.match_domains(
device, advertisement_data
)
for callback_filters in self._bleak_callbacks:
_dispatch_bleak_callback(*callback_filters, device, advertisement_data)
matched_domains = self._integration_matcher.match_domains(service_info)
_LOGGER.debug(
"%s: %s %s match: %s",
"%s: %s %s connectable: %s match: %s",
source,
device.address,
address,
advertisement_data,
connectable,
matched_domains,
)
if not matched_domains and not self._callbacks:
if (
not matched_domains
and not self._callbacks
and not self._connectable_callbacks
):
return
service_info: BluetoothServiceInfoBleak | None = None
for callback, matcher in self._callbacks:
if matcher is None or ble_device_matches(
matcher, device, advertisement_data
):
if service_info is None:
service_info = BluetoothServiceInfoBleak.from_advertisement(
device, advertisement_data, source
)
for connectable_callback in (True, False):
for callback, matcher in self._get_callbacks_by_type(connectable_callback):
if matcher and not ble_device_matches(matcher, service_info):
continue
try:
callback(service_info, BluetoothChange.ADVERTISEMENT)
except Exception: # pylint: disable=broad-except
@ -276,10 +299,6 @@ class BluetoothManager:
if not matched_domains:
return
if service_info is None:
service_info = BluetoothServiceInfoBleak.from_advertisement(
device, advertisement_data, source
)
for domain in matched_domains:
discovery_flow.async_create_flow(
self.hass,
@ -290,16 +309,17 @@ class BluetoothManager:
@hass_callback
def async_track_unavailable(
self, callback: Callable[[str], None], address: str
self, callback: Callable[[str], None], address: str, connectable: bool
) -> Callable[[], None]:
"""Register a callback."""
self._unavailable_callbacks.setdefault(address, []).append(callback)
unavailable_callbacks = self._get_unavailable_callbacks_by_type(connectable)
unavailable_callbacks.setdefault(address, []).append(callback)
@hass_callback
def _async_remove_callback() -> None:
self._unavailable_callbacks[address].remove(callback)
if not self._unavailable_callbacks[address]:
del self._unavailable_callbacks[address]
unavailable_callbacks[address].remove(callback)
if not unavailable_callbacks[address]:
del unavailable_callbacks[address]
return _async_remove_callback
@ -307,70 +327,102 @@ class BluetoothManager:
def async_register_callback(
self,
callback: BluetoothCallback,
matcher: BluetoothCallbackMatcher | None = None,
matcher: BluetoothCallbackMatcher | None,
) -> Callable[[], None]:
"""Register a callback."""
if not matcher:
matcher = BluetoothCallbackMatcher(connectable=True)
if CONNECTABLE not in matcher:
matcher[CONNECTABLE] = True
connectable = matcher[CONNECTABLE]
callback_entry = (callback, matcher)
self._callbacks.append(callback_entry)
callbacks = self._get_callbacks_by_type(connectable)
callbacks.append(callback_entry)
@hass_callback
def _async_remove_callback() -> None:
self._callbacks.remove(callback_entry)
callbacks.remove(callback_entry)
# If we have history for the subscriber, we can trigger the callback
# immediately with the last packet so the subscriber can see the
# device.
all_history = self._get_history_by_type(connectable)
if (
matcher
and (address := matcher.get(ADDRESS))
and (history := self.history.get(address))
(address := matcher.get(ADDRESS))
and (service_info := all_history.get(address))
and ble_device_matches(matcher, service_info)
):
try:
callback(
BluetoothServiceInfoBleak.from_advertisement(
history.ble_device, history.advertisement_data, SOURCE_LOCAL
),
BluetoothChange.ADVERTISEMENT,
)
callback(service_info, BluetoothChange.ADVERTISEMENT)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in bluetooth callback")
return _async_remove_callback
@hass_callback
def async_ble_device_from_address(self, address: str) -> BLEDevice | None:
def async_ble_device_from_address(
self, address: str, connectable: bool
) -> BLEDevice | None:
"""Return the BLEDevice if present."""
if history := self.history.get(address):
return history.ble_device
all_history = self._get_history_by_type(connectable)
if history := all_history.get(address):
return history.device
return None
@hass_callback
def async_address_present(self, address: str) -> bool:
def async_address_present(self, address: str, connectable: bool) -> bool:
"""Return if the address is present."""
return address in self.history
return address in self._get_history_by_type(connectable)
@hass_callback
def async_discovered_service_info(self) -> list[BluetoothServiceInfoBleak]:
def async_discovered_service_info(
self, connectable: bool
) -> Iterable[BluetoothServiceInfoBleak]:
"""Return if the address is present."""
return [
BluetoothServiceInfoBleak.from_advertisement(
history.ble_device, history.advertisement_data, SOURCE_LOCAL
)
for history in self.history.values()
]
return self._get_history_by_type(connectable).values()
@hass_callback
def async_rediscover_address(self, address: str) -> None:
"""Trigger discovery of devices which have already been seen."""
self._integration_matcher.async_clear_address(address)
def async_register_scanner(self, scanner: HaScanner) -> CALLBACK_TYPE:
def _get_scanners_by_type(self, connectable: bool) -> list[BaseHaScanner]:
"""Return the scanners by type."""
return self._connectable_scanners if connectable else self._scanners
def _get_unavailable_callbacks_by_type(
self, connectable: bool
) -> dict[str, list[Callable[[str], None]]]:
"""Return the unavailable callbacks by type."""
return (
self._connectable_unavailable_callbacks
if connectable
else self._unavailable_callbacks
)
def _get_history_by_type(
self, connectable: bool
) -> dict[str, BluetoothServiceInfoBleak]:
"""Return the history by type."""
return self._connectable_history if connectable else self._history
def _get_callbacks_by_type(
self, connectable: bool
) -> list[tuple[BluetoothCallback, BluetoothCallbackMatcher | None]]:
"""Return the callbacks by type."""
return self._connectable_callbacks if connectable else self._callbacks
def async_register_scanner(
self, scanner: BaseHaScanner, connectable: bool
) -> CALLBACK_TYPE:
"""Register a new scanner."""
scanners = self._get_scanners_by_type(connectable)
def _unregister_scanner() -> None:
self._scanners.remove(scanner)
scanners.remove(scanner)
self._scanners.append(scanner)
scanners.append(scanner)
return _unregister_scanner
@hass_callback
@ -388,9 +440,9 @@ class BluetoothManager:
# Replay the history since otherwise we miss devices
# that were already discovered before the callback was registered
# or we are in passive mode
for history in self.history.values():
for history in self._connectable_history.values():
_dispatch_bleak_callback(
callback, filters, history.ble_device, history.advertisement_data
callback, filters, history.device, history.advertisement
)
return _remove_callback

View File

@ -9,10 +9,11 @@ from lru import LRU # pylint: disable=no-name-in-module
from homeassistant.loader import BluetoothMatcher, BluetoothMatcherOptional
from .models import BluetoothServiceInfoBleak
if TYPE_CHECKING:
from collections.abc import MutableMapping
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
@ -20,6 +21,7 @@ MAX_REMEMBER_ADDRESSES: Final = 2048
ADDRESS: Final = "address"
CONNECTABLE: Final = "connectable"
LOCAL_NAME: Final = "local_name"
SERVICE_UUID: Final = "service_uuid"
SERVICE_DATA_UUID: Final = "service_data_uuid"
@ -50,14 +52,14 @@ class IntegrationMatchHistory:
def seen_all_fields(
previous_match: IntegrationMatchHistory, adv_data: AdvertisementData
previous_match: IntegrationMatchHistory, advertisement_data: AdvertisementData
) -> bool:
"""Return if we have seen all fields."""
if not previous_match.manufacturer_data and adv_data.manufacturer_data:
if not previous_match.manufacturer_data and advertisement_data.manufacturer_data:
return False
if not previous_match.service_data and adv_data.service_data:
if not previous_match.service_data and advertisement_data.service_data:
return False
if not previous_match.service_uuids and adv_data.service_uuids:
if not previous_match.service_uuids and advertisement_data.service_uuids:
return False
return True
@ -73,74 +75,93 @@ class IntegrationMatcher:
self._matched: MutableMapping[str, IntegrationMatchHistory] = LRU(
MAX_REMEMBER_ADDRESSES
)
self._matched_connectable: MutableMapping[str, IntegrationMatchHistory] = LRU(
MAX_REMEMBER_ADDRESSES
)
def async_clear_address(self, address: str) -> None:
"""Clear the history matches for a set of domains."""
self._matched.pop(address, None)
self._matched_connectable.pop(address, None)
def match_domains(self, device: BLEDevice, adv_data: AdvertisementData) -> set[str]:
def _get_matched_by_type(
self, connectable: bool
) -> MutableMapping[str, IntegrationMatchHistory]:
"""Return the matches by type."""
return self._matched_connectable if connectable else self._matched
def match_domains(self, service_info: BluetoothServiceInfoBleak) -> set[str]:
"""Return the domains that are matched."""
device = service_info.device
advertisement_data = service_info.advertisement
matched = self._get_matched_by_type(service_info.connectable)
matched_domains: set[str] = set()
if (previous_match := self._matched.get(device.address)) and seen_all_fields(
previous_match, adv_data
if (previous_match := matched.get(device.address)) and seen_all_fields(
previous_match, advertisement_data
):
# We have seen all fields so we can skip the rest of the matchers
return matched_domains
matched_domains = {
matcher["domain"]
for matcher in self._integration_matchers
if ble_device_matches(matcher, device, adv_data)
if ble_device_matches(matcher, service_info)
}
if not matched_domains:
return matched_domains
if previous_match:
previous_match.manufacturer_data |= bool(adv_data.manufacturer_data)
previous_match.service_data |= bool(adv_data.service_data)
previous_match.service_uuids |= bool(adv_data.service_uuids)
previous_match.manufacturer_data |= bool(
advertisement_data.manufacturer_data
)
previous_match.service_data |= bool(advertisement_data.service_data)
previous_match.service_uuids |= bool(advertisement_data.service_uuids)
else:
self._matched[device.address] = IntegrationMatchHistory(
manufacturer_data=bool(adv_data.manufacturer_data),
service_data=bool(adv_data.service_data),
service_uuids=bool(adv_data.service_uuids),
matched[device.address] = IntegrationMatchHistory(
manufacturer_data=bool(advertisement_data.manufacturer_data),
service_data=bool(advertisement_data.service_data),
service_uuids=bool(advertisement_data.service_uuids),
)
return matched_domains
def ble_device_matches(
matcher: BluetoothCallbackMatcher | BluetoothMatcher,
device: BLEDevice,
adv_data: AdvertisementData,
service_info: BluetoothServiceInfoBleak,
) -> bool:
"""Check if a ble device and advertisement_data matches the matcher."""
device = service_info.device
if (address := matcher.get(ADDRESS)) is not None and device.address != address:
return False
if matcher.get(CONNECTABLE, True) and not service_info.connectable:
return False
advertisement_data = service_info.advertisement
if (local_name := matcher.get(LOCAL_NAME)) is not None and not fnmatch.fnmatch(
adv_data.local_name or device.name or device.address,
advertisement_data.local_name or device.name or device.address,
local_name,
):
return False
if (
service_uuid := matcher.get(SERVICE_UUID)
) is not None and service_uuid not in adv_data.service_uuids:
) is not None and service_uuid not in advertisement_data.service_uuids:
return False
if (
service_data_uuid := matcher.get(SERVICE_DATA_UUID)
) is not None and service_data_uuid not in adv_data.service_data:
) is not None and service_data_uuid not in advertisement_data.service_data:
return False
if (
manfacturer_id := matcher.get(MANUFACTURER_ID)
) is not None and manfacturer_id not in adv_data.manufacturer_data:
) is not None and manfacturer_id not in advertisement_data.manufacturer_data:
return False
if (manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START)) is not None:
manufacturer_data_start_bytes = bytearray(manufacturer_data_start)
if not any(
manufacturer_data.startswith(manufacturer_data_start_bytes)
for manufacturer_data in adv_data.manufacturer_data.values()
for manufacturer_data in advertisement_data.manufacturer_data.values()
):
return False

View File

@ -1,6 +1,7 @@
"""Models for bluetooth."""
from __future__ import annotations
from abc import abstractmethod
import asyncio
from collections.abc import Callable
import contextlib
@ -45,23 +46,8 @@ class BluetoothServiceInfoBleak(BluetoothServiceInfo):
device: BLEDevice
advertisement: AdvertisementData
@classmethod
def from_advertisement(
cls, device: BLEDevice, advertisement_data: AdvertisementData, source: str
) -> BluetoothServiceInfoBleak:
"""Create a BluetoothServiceInfoBleak from an advertisement."""
return cls(
name=advertisement_data.local_name or device.name or device.address,
address=device.address,
rssi=device.rssi,
manufacturer_data=advertisement_data.manufacturer_data,
service_data=advertisement_data.service_data,
service_uuids=advertisement_data.service_uuids,
source=source,
device=device,
advertisement=advertisement_data,
)
connectable: bool
time: float
class BluetoothScanningMode(Enum):
@ -76,6 +62,15 @@ BluetoothCallback = Callable[[BluetoothServiceInfoBleak, BluetoothChange], None]
ProcessAdvertisementCallback = Callable[[BluetoothServiceInfoBleak], bool]
class BaseHaScanner:
"""Base class for Ha Scanners."""
@property
@abstractmethod
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
class HaBleakScannerWrapper(BaseBleakScanner):
"""A wrapper that uses the single instance."""
@ -89,7 +84,7 @@ class HaBleakScannerWrapper(BaseBleakScanner):
"""Initialize the BleakScanner."""
self._detection_cancel: CALLBACK_TYPE | None = None
self._mapped_filters: dict[str, set[str]] = {}
self._adv_data_callback: AdvertisementDataCallback | None = None
self._advertisement_data_callback: AdvertisementDataCallback | None = None
remapped_kwargs = {
"detection_callback": detection_callback,
"service_uuids": service_uuids or [],
@ -136,7 +131,7 @@ class HaBleakScannerWrapper(BaseBleakScanner):
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
assert MANAGER is not None
return list(MANAGER.async_discovered_devices())
return list(MANAGER.async_discovered_devices(True))
def register_detection_callback(
self, callback: AdvertisementDataCallback | None
@ -146,15 +141,15 @@ class HaBleakScannerWrapper(BaseBleakScanner):
This method takes the callback and registers it with the long running
scanner.
"""
self._adv_data_callback = callback
self._advertisement_data_callback = callback
self._setup_detection_callback()
def _setup_detection_callback(self) -> None:
"""Set up the detection callback."""
if self._adv_data_callback is None:
if self._advertisement_data_callback is None:
return
self._cancel_callback()
super().register_detection_callback(self._adv_data_callback)
super().register_detection_callback(self._advertisement_data_callback)
assert MANAGER is not None
assert self._callback is not None
self._detection_cancel = MANAGER.async_register_bleak_callback(
@ -193,7 +188,7 @@ class HaBleakClientWrapper(BleakClient):
error_if_core=False,
)
assert MANAGER is not None
ble_device = MANAGER.async_ble_device_from_address(address_or_ble_device)
ble_device = MANAGER.async_ble_device_from_address(address_or_ble_device, True)
if ble_device is None:
raise BleakError(f"No device found for address {address_or_ble_device}")
super().__init__(ble_device, *args, **kwargs)

View File

@ -28,9 +28,10 @@ class PassiveBluetoothDataUpdateCoordinator(BasePassiveBluetoothCoordinator):
logger: logging.Logger,
address: str,
mode: BluetoothScanningMode,
connectable: bool = False,
) -> None:
"""Initialize PassiveBluetoothDataUpdateCoordinator."""
super().__init__(hass, logger, address, mode)
super().__init__(hass, logger, address, mode, connectable)
self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
@callback

View File

@ -72,9 +72,10 @@ class PassiveBluetoothProcessorCoordinator(
address: str,
mode: BluetoothScanningMode,
update_method: Callable[[BluetoothServiceInfoBleak], _T],
connectable: bool = False,
) -> None:
"""Initialize the coordinator."""
super().__init__(hass, logger, address, mode)
super().__init__(hass, logger, address, mode, connectable)
self._processors: list[PassiveBluetoothDataProcessor] = []
self._update_method = update_method
self.last_update_success = True

View File

@ -32,7 +32,7 @@ from .const import (
SOURCE_LOCAL,
START_TIMEOUT,
)
from .models import BluetoothScanningMode
from .models import BaseHaScanner, BluetoothScanningMode, BluetoothServiceInfoBleak
from .util import adapter_human_name, async_reset_adapter
OriginalBleakScanner = bleak.BleakScanner
@ -92,7 +92,7 @@ def create_bleak_scanner(
raise RuntimeError(f"Failed to initialize Bluetooth: {ex}") from ex
class HaScanner:
class HaScanner(BaseHaScanner):
"""Operate and automatically recover a BleakScanner.
Multiple BleakScanner can be used at the same time
@ -119,9 +119,7 @@ class HaScanner:
self._cancel_watchdog: CALLBACK_TYPE | None = None
self._last_detection = 0.0
self._start_time = 0.0
self._callbacks: list[
Callable[[BLEDevice, AdvertisementData, float, str], None]
] = []
self._callbacks: list[Callable[[BluetoothServiceInfoBleak], None]] = []
self.name = adapter_human_name(adapter, address)
self.source = self.adapter or SOURCE_LOCAL
@ -132,7 +130,7 @@ class HaScanner:
@hass_callback
def async_register_callback(
self, callback: Callable[[BLEDevice, AdvertisementData, float, str], None]
self, callback: Callable[[BluetoothServiceInfoBleak], None]
) -> CALLBACK_TYPE:
"""Register a callback.
@ -149,7 +147,7 @@ class HaScanner:
@hass_callback
def _async_detection_callback(
self,
ble_device: BLEDevice,
device: BLEDevice,
advertisement_data: AdvertisementData,
) -> None:
"""Call the callback when an advertisement is received.
@ -168,8 +166,21 @@ class HaScanner:
# as the adapter is in a failure
# state if all the data is empty.
self._last_detection = callback_time
service_info = BluetoothServiceInfoBleak(
name=advertisement_data.local_name or device.name or device.address,
address=device.address,
rssi=device.rssi,
manufacturer_data=advertisement_data.manufacturer_data,
service_data=advertisement_data.service_data,
service_uuids=advertisement_data.service_uuids,
source=self.source,
device=device,
advertisement=advertisement_data,
connectable=True,
time=callback_time,
)
for callback in self._callbacks:
callback(ble_device, advertisement_data, callback_time, self.source)
callback(service_info)
async def async_start(self) -> None:
"""Start bluetooth scanner."""

View File

@ -28,12 +28,14 @@ class BasePassiveBluetoothCoordinator:
logger: logging.Logger,
address: str,
mode: BluetoothScanningMode,
connectable: bool,
) -> None:
"""Initialize the coordinator."""
self.hass = hass
self.logger = logger
self.name: str | None = None
self.address = address
self.connectable = connectable
self._cancel_track_unavailable: CALLBACK_TYPE | None = None
self._cancel_bluetooth_advertisements: CALLBACK_TYPE | None = None
self._present = False
@ -62,13 +64,13 @@ class BasePassiveBluetoothCoordinator:
self._cancel_bluetooth_advertisements = async_register_callback(
self.hass,
self._async_handle_bluetooth_event,
BluetoothCallbackMatcher(address=self.address),
BluetoothCallbackMatcher(
address=self.address, connectable=self.connectable
),
self.mode,
)
self._cancel_track_unavailable = async_track_unavailable(
self.hass,
self._async_handle_unavailable,
self.address,
self.hass, self._async_handle_unavailable, self.address, self.connectable
)
@callback

View File

@ -10,6 +10,7 @@ from bleak import BleakClient, BleakError
import voluptuous as vol
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.match import BluetoothCallbackMatcher
from homeassistant.components.device_tracker import (
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
)
@ -139,8 +140,20 @@ async def async_setup_scanner( # noqa: C901
) -> None:
"""Lookup Bluetooth LE devices and update status."""
battery = None
# We need one we can connect to since the tracker will
# accept devices from non-connectable sources
if service_info.connectable:
device = service_info.device
elif connectable_device := bluetooth.async_ble_device_from_address(
hass, service_info.device.address, True
):
device = connectable_device
else:
# The device can be seen by a passive tracker but we
# don't have a route to make a connection
return
try:
async with BleakClient(service_info.device) as client:
async with BleakClient(device) as client:
bat_char = await client.read_gatt_char(BATTERY_CHARACTERISTIC_UUID)
battery = ord(bat_char)
except asyncio.TimeoutError:
@ -192,12 +205,17 @@ async def async_setup_scanner( # noqa: C901
# interval so they do not get set to not_home when
# there have been no callbacks because the RSSI or
# other properties have not changed.
for service_info in bluetooth.async_discovered_service_info(hass):
for service_info in bluetooth.async_discovered_service_info(hass, False):
_async_update_ble(service_info, bluetooth.BluetoothChange.ADVERTISEMENT)
cancels = [
bluetooth.async_register_callback(
hass, _async_update_ble, None, bluetooth.BluetoothScanningMode.ACTIVE
hass,
_async_update_ble,
BluetoothCallbackMatcher(
connectable=False
), # We will take data from any source
bluetooth.BluetoothScanningMode.ACTIVE,
),
async_track_time_interval(hass, _async_refresh_ble, interval),
]

View File

@ -73,7 +73,7 @@ class GoveeConfigFlow(ConfigFlow, domain=DOMAIN):
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue

View File

@ -4,36 +4,43 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/govee_ble",
"bluetooth": [
{ "local_name": "Govee*" },
{ "local_name": "GVH5*" },
{ "local_name": "B5178*" },
{ "local_name": "Govee*", "connectable": false },
{ "local_name": "GVH5*", "connectable": false },
{ "local_name": "B5178*", "connectable": false },
{
"manufacturer_id": 6966,
"service_uuid": "00008451-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008451-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"manufacturer_id": 26589,
"service_uuid": "00008351-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008351-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"manufacturer_id": 18994,
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"manufacturer_id": 818,
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"manufacturer_id": 59970,
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"manufacturer_id": 14474,
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"manufacturer_id": 10032,
"service_uuid": "00008251-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008251-0000-1000-8000-00805f9b34fb",
"connectable": false
}
],
"requirements": ["govee-ble==0.16.0"],

View File

@ -73,7 +73,7 @@ class INKBIRDConfigFlow(ConfigFlow, domain=DOMAIN):
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue

View File

@ -4,11 +4,11 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/inkbird",
"bluetooth": [
{ "local_name": "sps" },
{ "local_name": "Inkbird*" },
{ "local_name": "iBBQ*" },
{ "local_name": "xBBQ*" },
{ "local_name": "tps" }
{ "local_name": "sps", "connectable": false },
{ "local_name": "Inkbird*", "connectable": false },
{ "local_name": "iBBQ*", "connectable": false },
{ "local_name": "xBBQ*", "connectable": false },
{ "local_name": "tps", "connectable": false }
],
"requirements": ["inkbird-ble==0.5.5"],
"dependencies": ["bluetooth"],

View File

@ -73,7 +73,7 @@ class MoatConfigFlow(ConfigFlow, domain=DOMAIN):
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue

View File

@ -3,7 +3,7 @@
"name": "Moat",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/moat",
"bluetooth": [{ "local_name": "Moat_S*" }],
"bluetooth": [{ "local_name": "Moat_S*", "connectable": false }],
"requirements": ["moat-ble==0.1.1"],
"dependencies": ["bluetooth"],
"codeowners": ["@bdraco"],

View File

@ -100,7 +100,7 @@ class QingpingConfigFlow(ConfigFlow, domain=DOMAIN):
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue

View File

@ -3,7 +3,7 @@
"name": "Qingping",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/qingping",
"bluetooth": [{ "local_name": "Qingping*" }],
"bluetooth": [{ "local_name": "Qingping*", "connectable": false }],
"requirements": ["qingping-ble==0.3.0"],
"dependencies": ["bluetooth"],
"codeowners": ["@bdraco"],

View File

@ -73,7 +73,7 @@ class SensorPushConfigFlow(ConfigFlow, domain=DOMAIN):
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue

View File

@ -5,7 +5,8 @@
"documentation": "https://www.home-assistant.io/integrations/sensorpush",
"bluetooth": [
{
"local_name": "SensorPush*"
"local_name": "SensorPush*",
"connectable": false
}
],
"requirements": ["sensorpush-ble==1.5.2"],

View File

@ -18,7 +18,13 @@ from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr
from .const import CONF_RETRY_COUNT, DEFAULT_RETRY_COUNT, DOMAIN, SupportedModels
from .const import (
CONF_RETRY_COUNT,
CONNECTABLE_SUPPORTED_MODEL_TYPES,
DEFAULT_RETRY_COUNT,
DOMAIN,
SupportedModels,
)
from .coordinator import SwitchbotDataUpdateCoordinator
PLATFORMS_BY_TYPE = {
@ -40,6 +46,7 @@ CLASS_BY_DEVICE = {
SupportedModels.PLUG.value: switchbot.SwitchbotPlugMini,
}
_LOGGER = logging.getLogger(__name__)
@ -65,8 +72,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
sensor_type: str = entry.data[CONF_SENSOR_TYPE]
# connectable means we can make connections to the device
connectable = sensor_type in CONNECTABLE_SUPPORTED_MODEL_TYPES.values()
address: str = entry.data[CONF_ADDRESS]
ble_device = bluetooth.async_ble_device_from_address(hass, address.upper())
ble_device = bluetooth.async_ble_device_from_address(
hass, address.upper(), connectable
)
if not ble_device:
raise ConfigEntryNotReady(
f"Could not find Switchbot {sensor_type} with address {address}"
@ -77,6 +88,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
password=entry.data.get(CONF_PASSWORD),
retry_count=entry.options[CONF_RETRY_COUNT],
)
coordinator = hass.data[DOMAIN][entry.entry_id] = SwitchbotDataUpdateCoordinator(
hass,
_LOGGER,
@ -84,6 +96,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
device,
entry.unique_id,
entry.data.get(CONF_NAME, entry.title),
connectable,
)
entry.async_on_unload(coordinator.async_start())
if not await coordinator.async_wait_ready():

View File

@ -16,7 +16,14 @@ from homeassistant.const import CONF_ADDRESS, CONF_PASSWORD, CONF_SENSOR_TYPE
from homeassistant.core import callback
from homeassistant.data_entry_flow import AbortFlow, FlowResult
from .const import CONF_RETRY_COUNT, DEFAULT_RETRY_COUNT, DOMAIN, SUPPORTED_MODEL_TYPES
from .const import (
CONF_RETRY_COUNT,
CONNECTABLE_SUPPORTED_MODEL_TYPES,
DEFAULT_RETRY_COUNT,
DOMAIN,
NON_CONNECTABLE_SUPPORTED_MODEL_TYPES,
SUPPORTED_MODEL_TYPES,
)
_LOGGER = logging.getLogger(__name__)
@ -67,6 +74,13 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
)
if not parsed or parsed.data.get("modelName") not in SUPPORTED_MODEL_TYPES:
return self.async_abort(reason="not_supported")
model_name = parsed.data.get("modelName")
if (
not discovery_info.connectable
and model_name in CONNECTABLE_SUPPORTED_MODEL_TYPES
):
# Source is not connectable but the model is connectable
return self.async_abort(reason="not_supported")
self._discovered_adv = parsed
data = parsed.data
self.context["title_placeholders"] = {
@ -133,18 +147,25 @@ class SwitchbotConfigFlow(ConfigFlow, domain=DOMAIN):
@callback
def _async_discover_devices(self) -> None:
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
address = discovery_info.address
if (
format_unique_id(address) in current_addresses
or address in self._discovered_advs
):
continue
parsed = parse_advertisement_data(
discovery_info.device, discovery_info.advertisement
)
if parsed and parsed.data.get("modelName") in SUPPORTED_MODEL_TYPES:
self._discovered_advs[address] = parsed
for connectable in (True, False):
for discovery_info in async_discovered_service_info(self.hass, connectable):
address = discovery_info.address
if (
format_unique_id(address) in current_addresses
or address in self._discovered_advs
):
continue
parsed = parse_advertisement_data(
discovery_info.device, discovery_info.advertisement
)
if not parsed:
continue
model_name = parsed.data.get("modelName")
if (
discovery_info.connectable
and model_name in CONNECTABLE_SUPPORTED_MODEL_TYPES
) or model_name in NON_CONNECTABLE_SUPPORTED_MODEL_TYPES:
self._discovered_advs[address] = parsed
if not self._discovered_advs:
raise AbortFlow("no_unconfigured_devices")

View File

@ -23,16 +23,23 @@ class SupportedModels(StrEnum):
MOTION = "motion"
SUPPORTED_MODEL_TYPES = {
CONNECTABLE_SUPPORTED_MODEL_TYPES = {
SwitchbotModel.BOT: SupportedModels.BOT,
SwitchbotModel.CURTAIN: SupportedModels.CURTAIN,
SwitchbotModel.METER: SupportedModels.HYGROMETER,
SwitchbotModel.CONTACT_SENSOR: SupportedModels.CONTACT,
SwitchbotModel.PLUG_MINI: SupportedModels.PLUG,
SwitchbotModel.MOTION_SENSOR: SupportedModels.MOTION,
SwitchbotModel.COLOR_BULB: SupportedModels.BULB,
}
NON_CONNECTABLE_SUPPORTED_MODEL_TYPES = {
SwitchbotModel.METER: SupportedModels.HYGROMETER,
SwitchbotModel.CONTACT_SENSOR: SupportedModels.CONTACT,
SwitchbotModel.MOTION_SENSOR: SupportedModels.MOTION,
}
SUPPORTED_MODEL_TYPES = {
**CONNECTABLE_SUPPORTED_MODEL_TYPES,
**NON_CONNECTABLE_SUPPORTED_MODEL_TYPES,
}
# Config Defaults
DEFAULT_RETRY_COUNT = 3

View File

@ -41,10 +41,15 @@ class SwitchbotDataUpdateCoordinator(PassiveBluetoothDataUpdateCoordinator):
device: switchbot.SwitchbotDevice,
base_unique_id: str,
device_name: str,
connectable: bool,
) -> None:
"""Initialize global switchbot data updater."""
super().__init__(
hass, logger, ble_device.address, bluetooth.BluetoothScanningMode.ACTIVE
hass,
logger,
ble_device.address,
bluetooth.BluetoothScanningMode.ACTIVE,
connectable,
)
self.ble_device = ble_device
self.device = device

View File

@ -14,10 +14,12 @@
],
"bluetooth": [
{
"service_data_uuid": "0000fd3d-0000-1000-8000-00805f9b34fb"
"service_data_uuid": "0000fd3d-0000-1000-8000-00805f9b34fb",
"connectable": false
},
{
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
"connectable": false
}
],
"iot_class": "local_push",

View File

@ -10,6 +10,7 @@ from homeassistant import config_entries
from homeassistant.components.bluetooth import (
BluetoothScanningMode,
BluetoothServiceInfoBleak,
async_ble_device_from_address,
)
from homeassistant.components.bluetooth.active_update_coordinator import (
ActiveBluetoothProcessorCoordinator,
@ -64,7 +65,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def _async_poll(service_info: BluetoothServiceInfoBleak):
# BluetoothServiceInfoBleak is defined in HA, otherwise would just pass it
# directly to the Xiaomi code
return await data.async_poll(service_info.device)
# Make sure the device we have is one that we can connect with
# in case its coming from a passive scanner
if service_info.connectable:
connectable_device = service_info.device
elif device := async_ble_device_from_address(
hass, service_info.device.address, True
):
connectable_device = device
else:
# We have no bluetooth controller that is in range of
# the device to poll it
raise RuntimeError(
f"No connectable device found for {service_info.device.address}"
)
return await data.async_poll(connectable_device)
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
@ -78,6 +93,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
),
needs_poll_method=_needs_poll,
poll_method=_async_poll,
# We will take advertisements from non-connectable devices
# since we will trade the BLEDevice for a connectable one
# if we need to poll it
connectable=False,
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -232,7 +232,7 @@ class XiaomiConfigFlow(ConfigFlow, domain=DOMAIN):
return self._async_get_or_create_entry()
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass):
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue

View File

@ -5,6 +5,7 @@
"documentation": "https://www.home-assistant.io/integrations/xiaomi_ble",
"bluetooth": [
{
"connectable": false,
"service_data_uuid": "0000fe95-0000-1000-8000-00805f9b34fb"
}
],

View File

@ -6,7 +6,7 @@ from __future__ import annotations
# fmt: off
BLUETOOTH: list[dict[str, str | int | list[int]]] = [
BLUETOOTH: list[dict[str, bool | str | int | list[int]]] = [
{
"domain": "fjaraskupan",
"manufacturer_id": 20296,
@ -21,50 +21,60 @@ BLUETOOTH: list[dict[str, str | int | list[int]]] = [
},
{
"domain": "govee_ble",
"local_name": "Govee*"
"local_name": "Govee*",
"connectable": False
},
{
"domain": "govee_ble",
"local_name": "GVH5*"
"local_name": "GVH5*",
"connectable": False
},
{
"domain": "govee_ble",
"local_name": "B5178*"
"local_name": "B5178*",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 6966,
"service_uuid": "00008451-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008451-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 26589,
"service_uuid": "00008351-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008351-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 18994,
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 818,
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008551-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 59970,
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 14474,
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008151-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "govee_ble",
"manufacturer_id": 10032,
"service_uuid": "00008251-0000-1000-8000-00805f9b34fb"
"service_uuid": "00008251-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "homekit_controller",
@ -75,46 +85,57 @@ BLUETOOTH: list[dict[str, str | int | list[int]]] = [
},
{
"domain": "inkbird",
"local_name": "sps"
"local_name": "sps",
"connectable": False
},
{
"domain": "inkbird",
"local_name": "Inkbird*"
"local_name": "Inkbird*",
"connectable": False
},
{
"domain": "inkbird",
"local_name": "iBBQ*"
"local_name": "iBBQ*",
"connectable": False
},
{
"domain": "inkbird",
"local_name": "xBBQ*"
"local_name": "xBBQ*",
"connectable": False
},
{
"domain": "inkbird",
"local_name": "tps"
"local_name": "tps",
"connectable": False
},
{
"domain": "moat",
"local_name": "Moat_S*"
"local_name": "Moat_S*",
"connectable": False
},
{
"domain": "qingping",
"local_name": "Qingping*"
"local_name": "Qingping*",
"connectable": False
},
{
"domain": "sensorpush",
"local_name": "SensorPush*"
"local_name": "SensorPush*",
"connectable": False
},
{
"domain": "switchbot",
"service_data_uuid": "0000fd3d-0000-1000-8000-00805f9b34fb"
"service_data_uuid": "0000fd3d-0000-1000-8000-00805f9b34fb",
"connectable": False
},
{
"domain": "switchbot",
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
"connectable": False
},
{
"domain": "xiaomi_ble",
"connectable": False,
"service_data_uuid": "0000fe95-0000-1000-8000-00805f9b34fb"
},
{

View File

@ -91,6 +91,7 @@ class BluetoothMatcherOptional(TypedDict, total=False):
service_data_uuid: str
manufacturer_id: int
manufacturer_data_start: list[int]
connectable: bool
class BluetoothMatcher(BluetoothMatcherRequired, BluetoothMatcherOptional):

View File

@ -14,7 +14,7 @@ from __future__ import annotations
# fmt: off
BLUETOOTH: list[dict[str, str | int | list[int]]] = {}
BLUETOOTH: list[dict[str, bool | str | int | list[int]]] = {}
""".strip()
@ -36,7 +36,11 @@ def generate_and_validate(integrations: list[dict[str, str]]):
for entry in match_types:
match_list.append({"domain": domain, **entry})
return BASE.format(json.dumps(match_list, indent=4))
return BASE.format(
json.dumps(match_list, indent=4)
.replace('": true', '": True')
.replace('": false', '": False')
)
def validate(integrations: dict[str, Integration], config: Config):

View File

@ -197,6 +197,7 @@ MANIFEST_SCHEMA = vol.Schema(
vol.Optional("bluetooth"): [
vol.Schema(
{
vol.Optional("connectable"): bool,
vol.Optional("service_uuid"): vol.All(str, verify_lowercase),
vol.Optional("service_data_uuid"): vol.All(str, verify_lowercase),
vol.Optional("local_name"): vol.All(str),

View File

@ -6,7 +6,12 @@ from unittest.mock import patch
from bleak.backends.scanner import AdvertisementData, BLEDevice
from homeassistant.components.bluetooth import DOMAIN, SOURCE_LOCAL, models
from homeassistant.components.bluetooth import (
DOMAIN,
SOURCE_LOCAL,
async_get_advertisement_callback,
models,
)
from homeassistant.components.bluetooth.const import DEFAULT_ADDRESS
from homeassistant.components.bluetooth.manager import BluetoothManager
from homeassistant.core import HomeAssistant
@ -20,38 +25,84 @@ def _get_manager() -> BluetoothManager:
return models.MANAGER
def inject_advertisement(device: BLEDevice, adv: AdvertisementData) -> None:
def inject_advertisement(
hass: HomeAssistant, device: BLEDevice, adv: AdvertisementData
) -> None:
"""Inject an advertisement into the manager."""
return inject_advertisement_with_source(device, adv, SOURCE_LOCAL)
return inject_advertisement_with_source(hass, device, adv, SOURCE_LOCAL)
def inject_advertisement_with_source(
device: BLEDevice, adv: AdvertisementData, source: str
hass: HomeAssistant, device: BLEDevice, adv: AdvertisementData, source: str
) -> None:
"""Inject an advertisement into the manager from a specific source."""
inject_advertisement_with_time_and_source(device, adv, time.monotonic(), source)
inject_advertisement_with_time_and_source(
hass, device, adv, time.monotonic(), source
)
def inject_advertisement_with_time_and_source(
device: BLEDevice, adv: AdvertisementData, time: float, source: str
hass: HomeAssistant,
device: BLEDevice,
adv: AdvertisementData,
time: float,
source: str,
) -> None:
"""Inject an advertisement into the manager from a specific source at a time."""
return _get_manager().scanner_adv_received(device, adv, time, source)
inject_advertisement_with_time_and_source_connectable(
hass, device, adv, time, source, True
)
def inject_advertisement_with_time_and_source_connectable(
hass: HomeAssistant,
device: BLEDevice,
adv: AdvertisementData,
time: float,
source: str,
connectable: bool,
) -> None:
"""Inject an advertisement into the manager from a specific source at a time and connectable status."""
async_get_advertisement_callback(hass)(
models.BluetoothServiceInfoBleak(
name=adv.local_name or device.name or device.address,
address=device.address,
rssi=device.rssi,
manufacturer_data=adv.manufacturer_data,
service_data=adv.service_data,
service_uuids=adv.service_uuids,
source=source,
device=device,
advertisement=adv,
connectable=connectable,
time=time,
)
)
def patch_all_discovered_devices(mock_discovered: list[BLEDevice]) -> None:
"""Mock all the discovered devices from all the scanners."""
manager = _get_manager()
return patch.object(
manager, "async_all_discovered_devices", return_value=mock_discovered
_get_manager(), "async_all_discovered_devices", return_value=mock_discovered
)
def patch_history(mock_history: dict[str, models.BluetoothServiceInfoBleak]) -> None:
"""Patch the history."""
return patch.dict(_get_manager()._history, mock_history)
def patch_connectable_history(
mock_history: dict[str, models.BluetoothServiceInfoBleak]
) -> None:
"""Patch the connectable history."""
return patch.dict(_get_manager()._connectable_history, mock_history)
def patch_discovered_devices(mock_discovered: list[BLEDevice]) -> None:
"""Mock the combined best path to discovered devices from all the scanners."""
manager = _get_manager()
return patch.object(
manager, "async_discovered_devices", return_value=mock_discovered
_get_manager(), "async_discovered_devices", return_value=mock_discovered
)

View File

@ -1,7 +1,8 @@
"""Tests for the Bluetooth integration."""
import asyncio
from datetime import timedelta
from unittest.mock import MagicMock, patch
import time
from unittest.mock import MagicMock, Mock, patch
from bleak import BleakError
from bleak.backends.scanner import AdvertisementData, BLEDevice
@ -20,6 +21,7 @@ from homeassistant.components.bluetooth import (
)
from homeassistant.components.bluetooth.const import (
DEFAULT_ADDRESS,
DOMAIN,
SOURCE_LOCAL,
UNAVAILABLE_TRACK_SECONDS,
)
@ -33,6 +35,7 @@ from . import (
_get_manager,
async_setup_with_default_adapter,
inject_advertisement,
inject_advertisement_with_time_and_source_connectable,
patch_discovered_devices,
)
@ -228,7 +231,7 @@ async def test_discovery_match_by_service_uuid(
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
inject_advertisement(wrong_device, wrong_adv)
inject_advertisement(hass, wrong_device, wrong_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -238,13 +241,160 @@ async def test_discovery_match_by_service_uuid(
local_name="wohand", service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "switchbot"
def _domains_from_mock_config_flow(mock_config_flow: Mock) -> list[str]:
"""Get all the domains that were passed to async_init except bluetooth."""
return [call[1][0] for call in mock_config_flow.mock_calls if call[1][0] != DOMAIN]
async def test_discovery_match_by_service_uuid_connectable(
hass, mock_bleak_scanner_start, macos_adapter
):
"""Test bluetooth discovery match by service_uuid and the ble device is connectable."""
mock_bt = [
{
"domain": "switchbot",
"connectable": True,
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
}
]
with patch(
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
await async_setup_with_default_adapter(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert len(mock_bleak_scanner_start.mock_calls) == 1
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
inject_advertisement_with_time_and_source_connectable(
hass, wrong_device, wrong_adv, time.monotonic(), "any", True
)
await hass.async_block_till_done()
assert len(_domains_from_mock_config_flow(mock_config_flow)) == 0
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
switchbot_adv = AdvertisementData(
local_name="wohand", service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
inject_advertisement_with_time_and_source_connectable(
hass, switchbot_device, switchbot_adv, time.monotonic(), "any", True
)
await hass.async_block_till_done()
called_domains = _domains_from_mock_config_flow(mock_config_flow)
assert len(called_domains) == 1
assert called_domains == ["switchbot"]
async def test_discovery_match_by_service_uuid_not_connectable(
hass, mock_bleak_scanner_start, macos_adapter
):
"""Test bluetooth discovery match by service_uuid and the ble device is not connectable."""
mock_bt = [
{
"domain": "switchbot",
"connectable": True,
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b",
}
]
with patch(
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
await async_setup_with_default_adapter(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert len(mock_bleak_scanner_start.mock_calls) == 1
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
inject_advertisement_with_time_and_source_connectable(
hass, wrong_device, wrong_adv, time.monotonic(), "any", False
)
await hass.async_block_till_done()
assert len(_domains_from_mock_config_flow(mock_config_flow)) == 0
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
switchbot_adv = AdvertisementData(
local_name="wohand", service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
inject_advertisement_with_time_and_source_connectable(
hass, switchbot_device, switchbot_adv, time.monotonic(), "any", False
)
await hass.async_block_till_done()
assert len(_domains_from_mock_config_flow(mock_config_flow)) == 0
async def test_discovery_match_by_name_connectable_false(
hass, mock_bleak_scanner_start, macos_adapter
):
"""Test bluetooth discovery match by name and the integration will take non-connectable devices."""
mock_bt = [
{
"domain": "qingping",
"connectable": False,
"local_name": "Qingping*",
}
]
with patch(
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
await async_setup_with_default_adapter(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert len(mock_bleak_scanner_start.mock_calls) == 1
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
inject_advertisement_with_time_and_source_connectable(
hass, wrong_device, wrong_adv, time.monotonic(), "any", False
)
await hass.async_block_till_done()
assert len(_domains_from_mock_config_flow(mock_config_flow)) == 0
qingping_device = BLEDevice("44:44:33:11:23:45", "Qingping Motion & Light")
qingping_adv = AdvertisementData(
local_name="Qingping Motion & Light",
service_data={
"0000fdcd-0000-1000-8000-00805f9b34fb": b"H\x12\xcd\xd5`4-X\x08\x04\x01\xe8\x00\x00\x0f\x01{"
},
)
inject_advertisement_with_time_and_source_connectable(
hass, qingping_device, qingping_adv, time.monotonic(), "any", False
)
await hass.async_block_till_done()
assert _domains_from_mock_config_flow(mock_config_flow) == ["qingping"]
mock_config_flow.reset_mock()
# Make sure it will also take a connectable device
inject_advertisement_with_time_and_source_connectable(
hass, qingping_device, qingping_adv, time.monotonic(), "any", True
)
await hass.async_block_till_done()
assert _domains_from_mock_config_flow(mock_config_flow) == ["qingping"]
async def test_discovery_match_by_local_name(
hass, mock_bleak_scanner_start, macos_adapter
):
@ -264,7 +414,7 @@ async def test_discovery_match_by_local_name(
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
inject_advertisement(wrong_device, wrong_adv)
inject_advertisement(hass, wrong_device, wrong_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -272,7 +422,7 @@ async def test_discovery_match_by_local_name(
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
switchbot_adv = AdvertisementData(local_name="wohand", service_uuids=[])
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
@ -315,21 +465,21 @@ async def test_discovery_match_by_manufacturer_id_and_manufacturer_data_start(
# 1st discovery with no manufacturer data
# should not trigger config flow
inject_advertisement(hkc_device, hkc_adv_no_mfr_data)
inject_advertisement(hass, hkc_device, hkc_adv_no_mfr_data)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
mock_config_flow.reset_mock()
# 2nd discovery with manufacturer data
# should trigger a config flow
inject_advertisement(hkc_device, hkc_adv)
inject_advertisement(hass, hkc_device, hkc_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "homekit_controller"
mock_config_flow.reset_mock()
# 3rd discovery should not generate another flow
inject_advertisement(hkc_device, hkc_adv)
inject_advertisement(hass, hkc_device, hkc_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -340,7 +490,7 @@ async def test_discovery_match_by_manufacturer_id_and_manufacturer_data_start(
local_name="lock", service_uuids=[], manufacturer_data={76: b"\x02"}
)
inject_advertisement(not_hkc_device, not_hkc_adv)
inject_advertisement(hass, not_hkc_device, not_hkc_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -349,7 +499,7 @@ async def test_discovery_match_by_manufacturer_id_and_manufacturer_data_start(
local_name="lock", service_uuids=[], manufacturer_data={21: b"\x02"}
)
inject_advertisement(not_apple_device, not_apple_adv)
inject_advertisement(hass, not_apple_device, not_apple_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -422,21 +572,21 @@ async def test_discovery_match_by_service_data_uuid_then_others(
)
# 1st discovery should not generate a flow because the
# service_data_uuid is not in the advertisement
inject_advertisement(device, adv_without_service_data_uuid)
inject_advertisement(hass, device, adv_without_service_data_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
mock_config_flow.reset_mock()
# 2nd discovery should not generate a flow because the
# service_data_uuid is not in the advertisement
inject_advertisement(device, adv_without_service_data_uuid)
inject_advertisement(hass, device, adv_without_service_data_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
mock_config_flow.reset_mock()
# 3rd discovery should generate a flow because the
# manufacturer_data is in the advertisement
inject_advertisement(device, adv_with_mfr_data)
inject_advertisement(hass, device, adv_with_mfr_data)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "other_domain"
@ -445,7 +595,7 @@ async def test_discovery_match_by_service_data_uuid_then_others(
# 4th discovery should generate a flow because the
# service_data_uuid is in the advertisement and
# we never saw a service_data_uuid before
inject_advertisement(device, adv_with_service_data_uuid)
inject_advertisement(hass, device, adv_with_service_data_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "my_domain"
@ -453,14 +603,14 @@ async def test_discovery_match_by_service_data_uuid_then_others(
# 5th discovery should not generate a flow because the
# we already saw an advertisement with the service_data_uuid
inject_advertisement(device, adv_with_service_data_uuid)
inject_advertisement(hass, device, adv_with_service_data_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
# 6th discovery should not generate a flow because the
# manufacturer_data is in the advertisement
# and we saw manufacturer_data before
inject_advertisement(device, adv_with_service_data_uuid_and_mfr_data)
inject_advertisement(hass, device, adv_with_service_data_uuid_and_mfr_data)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
mock_config_flow.reset_mock()
@ -469,7 +619,7 @@ async def test_discovery_match_by_service_data_uuid_then_others(
# service_uuids is in the advertisement
# and we never saw service_uuids before
inject_advertisement(
device, adv_with_service_data_uuid_and_mfr_data_and_service_uuid
hass, device, adv_with_service_data_uuid_and_mfr_data_and_service_uuid
)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 2
@ -482,7 +632,7 @@ async def test_discovery_match_by_service_data_uuid_then_others(
# 8th discovery should not generate a flow
# since all fields have been seen at this point
inject_advertisement(
device, adv_with_service_data_uuid_and_mfr_data_and_service_uuid
hass, device, adv_with_service_data_uuid_and_mfr_data_and_service_uuid
)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -490,19 +640,19 @@ async def test_discovery_match_by_service_data_uuid_then_others(
# 9th discovery should not generate a flow
# since all fields have been seen at this point
inject_advertisement(device, adv_with_service_uuid)
inject_advertisement(hass, device, adv_with_service_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
# 10th discovery should not generate a flow
# since all fields have been seen at this point
inject_advertisement(device, adv_with_service_data_uuid)
inject_advertisement(hass, device, adv_with_service_data_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
# 11th discovery should not generate a flow
# since all fields have been seen at this point
inject_advertisement(device, adv_without_service_data_uuid)
inject_advertisement(hass, device, adv_without_service_data_uuid)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -546,7 +696,7 @@ async def test_discovery_match_first_by_service_uuid_and_then_manufacturer_id(
# 1st discovery with matches service_uuid
# should trigger config flow
inject_advertisement(device, adv_service_uuids)
inject_advertisement(hass, device, adv_service_uuids)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "my_domain"
@ -554,19 +704,19 @@ async def test_discovery_match_first_by_service_uuid_and_then_manufacturer_id(
# 2nd discovery with manufacturer data
# should trigger a config flow
inject_advertisement(device, adv_manufacturer_data)
inject_advertisement(hass, device, adv_manufacturer_data)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
assert mock_config_flow.mock_calls[0][1][0] == "my_domain"
mock_config_flow.reset_mock()
# 3rd discovery should not generate another flow
inject_advertisement(device, adv_service_uuids)
inject_advertisement(hass, device, adv_service_uuids)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
# 4th discovery should not generate another flow
inject_advertisement(device, adv_manufacturer_data)
inject_advertisement(hass, device, adv_manufacturer_data)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 0
@ -590,10 +740,10 @@ async def test_rediscovery(hass, mock_bleak_scanner_start, enable_bluetooth):
local_name="wohand", service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 1
@ -601,7 +751,7 @@ async def test_rediscovery(hass, mock_bleak_scanner_start, enable_bluetooth):
async_rediscover_address(hass, "44:44:33:11:23:45")
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(mock_config_flow.mock_calls) == 2
@ -633,10 +783,10 @@ async def test_async_discovered_device_api(
wrong_device = BLEDevice("44:44:33:11:23:42", "wrong_name")
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
inject_advertisement(wrong_device, wrong_adv)
inject_advertisement(hass, wrong_device, wrong_adv)
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
switchbot_adv = AdvertisementData(local_name="wohand", service_uuids=[])
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
wrong_device_went_unavailable = False
switchbot_device_went_unavailable = False
@ -670,8 +820,8 @@ async def test_async_discovered_device_api(
assert wrong_device_went_unavailable is True
# See the devices again
inject_advertisement(wrong_device, wrong_adv)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, wrong_device, wrong_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
# Cancel the callbacks
wrong_device_unavailable_cancel()
switchbot_device_unavailable_cancel()
@ -688,10 +838,11 @@ async def test_async_discovered_device_api(
assert len(service_infos) == 1
# wrong_name should not appear because bleak no longer sees it
assert service_infos[0].name == "wohand"
assert service_infos[0].source == SOURCE_LOCAL
assert isinstance(service_infos[0].device, BLEDevice)
assert isinstance(service_infos[0].advertisement, AdvertisementData)
infos = list(service_infos)
assert infos[0].name == "wohand"
assert infos[0].source == SOURCE_LOCAL
assert isinstance(infos[0].device, BLEDevice)
assert isinstance(infos[0].advertisement, AdvertisementData)
assert bluetooth.async_address_present(hass, "44:44:33:11:23:42") is False
assert bluetooth.async_address_present(hass, "44:44:33:11:23:45") is True
@ -736,25 +887,25 @@ async def test_register_callbacks(hass, mock_bleak_scanner_start, enable_bluetoo
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
empty_adv = AdvertisementData(local_name="empty")
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
await hass.async_block_till_done()
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
empty_adv = AdvertisementData(local_name="empty")
# 3rd callback raises ValueError but is still tracked
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
await hass.async_block_till_done()
cancel()
# 4th callback should not be tracked since we canceled
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
await hass.async_block_till_done()
assert len(callbacks) == 3
@ -819,25 +970,25 @@ async def test_register_callback_by_address(
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
empty_adv = AdvertisementData(local_name="empty")
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
await hass.async_block_till_done()
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
empty_adv = AdvertisementData(local_name="empty")
# 3rd callback raises ValueError but is still tracked
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
await hass.async_block_till_done()
cancel()
# 4th callback should not be tracked since we canceled
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
await hass.async_block_till_done()
# Now register again with a callback that fails to
@ -907,7 +1058,7 @@ async def test_register_callback_survives_reload(
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
assert len(callbacks) == 1
service_info: BluetoothServiceInfo = callbacks[0][0]
assert service_info.name == "wohand"
@ -918,7 +1069,7 @@ async def test_register_callback_survives_reload(
await hass.config_entries.async_reload(entry.entry_id)
await hass.async_block_till_done()
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
assert len(callbacks) == 2
service_info: BluetoothServiceInfo = callbacks[1][0]
assert service_info.name == "wohand"
@ -955,9 +1106,9 @@ async def test_process_advertisements_bail_on_good_advertisement(
service_data={"00000d00-0000-1000-8000-00805f9b34fa": b"H\x10c"},
)
inject_advertisement(device, adv)
inject_advertisement(device, adv)
inject_advertisement(device, adv)
inject_advertisement(hass, device, adv)
inject_advertisement(hass, device, adv)
inject_advertisement(hass, device, adv)
await asyncio.sleep(0)
@ -997,14 +1148,14 @@ async def test_process_advertisements_ignore_bad_advertisement(
# The goal of this loop is to make sure that async_process_advertisements sees at least one
# callback that returns False
while not done.is_set():
inject_advertisement(device, adv)
inject_advertisement(hass, device, adv)
await asyncio.sleep(0)
# Set the return value and mutate the advertisement
# Check that scan ends and correct advertisement data is returned
return_value.set()
adv.service_data["00000d00-0000-1000-8000-00805f9b34fa"] = b"H\x10c"
inject_advertisement(device, adv)
inject_advertisement(hass, device, adv)
await asyncio.sleep(0)
result = await handle
@ -1062,7 +1213,7 @@ async def test_wrapped_instance_with_filter(
)
scanner.register_detection_callback(_device_detected)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
discovered = await scanner.discover(timeout=0)
@ -1082,12 +1233,12 @@ async def test_wrapped_instance_with_filter(
assert len(discovered) == 0
assert discovered == []
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
assert len(detected) == 4
# The filter we created in the wrapped scanner with should be respected
# and we should not get another callback
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
assert len(detected) == 4
@ -1129,14 +1280,14 @@ async def test_wrapped_instance_with_service_uuids(
scanner.register_detection_callback(_device_detected)
for _ in range(2):
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(detected) == 2
# The UUIDs list we created in the wrapped scanner with should be respected
# and we should not get another callback
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
assert len(detected) == 2
@ -1177,9 +1328,9 @@ async def test_wrapped_instance_with_broken_callbacks(
)
scanner.register_detection_callback(_device_detected)
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(detected) == 1
@ -1222,14 +1373,14 @@ async def test_wrapped_instance_changes_uuids(
scanner.register_detection_callback(_device_detected)
for _ in range(2):
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(detected) == 2
# The UUIDs list we created in the wrapped scanner with should be respected
# and we should not get another callback
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
assert len(detected) == 2
@ -1271,14 +1422,14 @@ async def test_wrapped_instance_changes_filters(
scanner.register_detection_callback(_device_detected)
for _ in range(2):
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert len(detected) == 2
# The UUIDs list we created in the wrapped scanner with should be respected
# and we should not get another callback
inject_advertisement(empty_device, empty_adv)
inject_advertisement(hass, empty_device, empty_adv)
assert len(detected) == 2
@ -1333,7 +1484,7 @@ async def test_async_ble_device_from_address(
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
switchbot_adv = AdvertisementData(local_name="wohand", service_uuids=[])
inject_advertisement(switchbot_device, switchbot_adv)
inject_advertisement(hass, switchbot_device, switchbot_adv)
await hass.async_block_till_done()
assert (

View File

@ -24,7 +24,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason(
local_name="wohand_signal_100", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_signal_100, switchbot_adv_signal_100, "hci0"
hass, switchbot_device_signal_100, switchbot_adv_signal_100, "hci0"
)
assert (
@ -37,7 +37,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason(
local_name="wohand_signal_99", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_signal_99, switchbot_adv_signal_99, "hci0"
hass, switchbot_device_signal_99, switchbot_adv_signal_99, "hci0"
)
assert (
@ -50,7 +50,7 @@ async def test_advertisements_do_not_switch_adapters_for_no_reason(
local_name="wohand_good_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_signal_98, switchbot_adv_signal_98, "hci1"
hass, switchbot_device_signal_98, switchbot_adv_signal_98, "hci1"
)
# should not switch to hci1
@ -70,7 +70,7 @@ async def test_switching_adapters_based_on_rssi(hass, enable_bluetooth):
local_name="wohand_poor_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0"
hass, switchbot_device_poor_signal, switchbot_adv_poor_signal, "hci0"
)
assert (
@ -83,7 +83,7 @@ async def test_switching_adapters_based_on_rssi(hass, enable_bluetooth):
local_name="wohand_good_signal", service_uuids=[]
)
inject_advertisement_with_source(
switchbot_device_good_signal, switchbot_adv_good_signal, "hci1"
hass, switchbot_device_good_signal, switchbot_adv_good_signal, "hci1"
)
assert (
@ -92,7 +92,7 @@ async def test_switching_adapters_based_on_rssi(hass, enable_bluetooth):
)
inject_advertisement_with_source(
switchbot_device_good_signal, switchbot_adv_poor_signal, "hci0"
hass, switchbot_device_good_signal, switchbot_adv_poor_signal, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
@ -108,7 +108,7 @@ async def test_switching_adapters_based_on_rssi(hass, enable_bluetooth):
)
inject_advertisement_with_source(
switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0"
hass, switchbot_device_similar_signal, switchbot_adv_similar_signal, "hci0"
)
assert (
bluetooth.async_ble_device_from_address(hass, address)
@ -129,6 +129,7 @@ async def test_switching_adapters_based_on_stale(hass, enable_bluetooth):
local_name="wohand_poor_signal_hci0", service_uuids=[]
)
inject_advertisement_with_time_and_source(
hass,
switchbot_device_poor_signal_hci0,
switchbot_adv_poor_signal_hci0,
start_time_monotonic,
@ -147,6 +148,7 @@ async def test_switching_adapters_based_on_stale(hass, enable_bluetooth):
local_name="wohand_poor_signal_hci1", service_uuids=[]
)
inject_advertisement_with_time_and_source(
hass,
switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1,
start_time_monotonic,
@ -163,6 +165,7 @@ async def test_switching_adapters_based_on_stale(hass, enable_bluetooth):
# even though the signal is poor because the device is now
# likely unreachable via hci0
inject_advertisement_with_time_and_source(
hass,
switchbot_device_poor_signal_hci1,
switchbot_adv_poor_signal_hci1,
start_time_monotonic + STALE_ADVERTISEMENT_SECONDS + 1,

View File

@ -20,7 +20,7 @@ from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import _get_manager, patch_all_discovered_devices
from . import patch_all_discovered_devices, patch_history
from tests.common import async_fire_time_changed
@ -178,15 +178,9 @@ async def test_unavailable_callbacks_mark_the_coordinator_unavailable(
saved_callback(GENERIC_BLUETOOTH_SERVICE_INFO, BluetoothChange.ADVERTISEMENT)
assert coordinator.available is True
scanner = _get_manager()
with patch_all_discovered_devices(
[MagicMock(address="44:44:33:11:23:45")]
), patch.object(
scanner,
"history",
{"aa:bb:cc:dd:ee:ff": MagicMock()},
):
), patch_history({"aa:bb:cc:dd:ee:ff": MagicMock()}):
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=UNAVAILABLE_TRACK_SECONDS)
)
@ -198,11 +192,7 @@ async def test_unavailable_callbacks_mark_the_coordinator_unavailable(
with patch_all_discovered_devices(
[MagicMock(address="44:44:33:11:23:45")]
), patch.object(
scanner,
"history",
{"aa:bb:cc:dd:ee:ff": MagicMock()},
):
), patch_history({"aa:bb:cc:dd:ee:ff": MagicMock()}):
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=UNAVAILABLE_TRACK_SECONDS)
)

View File

@ -32,7 +32,7 @@ from homeassistant.helpers.entity import DeviceInfo
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import _get_manager, patch_all_discovered_devices
from . import patch_all_discovered_devices, patch_connectable_history, patch_history
from tests.common import MockEntityPlatform, async_fire_time_changed
@ -246,12 +246,9 @@ async def test_unavailable_after_no_data(hass, mock_bleak_scanner_start):
assert len(mock_add_entities.mock_calls) == 1
assert coordinator.available is True
assert processor.available is True
scanner = _get_manager()
with patch_all_discovered_devices(
[MagicMock(address="44:44:33:11:23:45")]
), patch.object(
scanner,
"history",
), patch_history({"aa:bb:cc:dd:ee:ff": MagicMock()}), patch_connectable_history(
{"aa:bb:cc:dd:ee:ff": MagicMock()},
):
async_fire_time_changed(
@ -268,9 +265,7 @@ async def test_unavailable_after_no_data(hass, mock_bleak_scanner_start):
with patch_all_discovered_devices(
[MagicMock(address="44:44:33:11:23:45")]
), patch.object(
scanner,
"history",
), patch_history({"aa:bb:cc:dd:ee:ff": MagicMock()}), patch_connectable_history(
{"aa:bb:cc:dd:ee:ff": MagicMock()},
):
async_fire_time_changed(

View File

@ -90,6 +90,8 @@ async def test_preserve_new_tracked_device_name(
source="local",
device=BLEDevice(address, None),
advertisement=AdvertisementData(local_name="empty"),
time=0,
connectable=False,
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
@ -113,6 +115,8 @@ async def test_preserve_new_tracked_device_name(
source="local",
device=BLEDevice(address, None),
advertisement=AdvertisementData(local_name="empty"),
time=0,
connectable=False,
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
@ -155,6 +159,8 @@ async def test_tracking_battery_times_out(
source="local",
device=BLEDevice(address, None),
advertisement=AdvertisementData(local_name="empty"),
time=0,
connectable=False,
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
@ -219,6 +225,8 @@ async def test_tracking_battery_fails(hass, mock_bluetooth, mock_device_tracker_
source="local",
device=BLEDevice(address, None),
advertisement=AdvertisementData(local_name="empty"),
time=0,
connectable=False,
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
@ -285,6 +293,8 @@ async def test_tracking_battery_successful(
source="local",
device=BLEDevice(address, None),
advertisement=AdvertisementData(local_name="empty"),
time=0,
connectable=True,
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]

View File

@ -4,8 +4,18 @@
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from homeassistant.components.bluetooth import SOURCE_LOCAL, BluetoothServiceInfoBleak
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
COOKER_SERVICE_INFO = BluetoothServiceInfoBleak.from_advertisement(
BLEDevice("1.1.1.1", "COOKERHOOD_FJAR"), AdvertisementData(), source=SOURCE_LOCAL
COOKER_SERVICE_INFO = BluetoothServiceInfoBleak(
name="COOKERHOOD_FJAR",
address="AA:BB:CC:DD:EE:FF",
rssi=-60,
manufacturer_data={},
service_uuids=[],
service_data={},
source="local",
device=BLEDevice(address="AA:BB:CC:DD:EE:FF", name="COOKERHOOD_FJAR"),
advertisement=AdvertisementData(),
time=0,
connectable=True,
)

View File

@ -69,6 +69,28 @@ WOHAND_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
),
device=BLEDevice("aa:bb:cc:dd:ee:ff", "WoHand"),
time=0,
connectable=True,
)
WOHAND_SERVICE_INFO_NOT_CONNECTABLE = BluetoothServiceInfoBleak(
name="WoHand",
manufacturer_data={89: b"\xfd`0U\x92W"},
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x90\xd9"},
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
address="aa:bb:cc:dd:ee:ff",
rssi=-60,
source="local",
advertisement=AdvertisementData(
local_name="WoHand",
manufacturer_data={89: b"\xfd`0U\x92W"},
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x90\xd9"},
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
),
device=BLEDevice("aa:bb:cc:dd:ee:ff", "WoHand"),
time=0,
connectable=False,
)
@ -87,6 +109,8 @@ WOHAND_ENCRYPTED_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
),
device=BLEDevice("798A8547-2A3D-C609-55FF-73FA824B923B", "WoHand"),
time=0,
connectable=True,
)
@ -105,6 +129,8 @@ WOHAND_SERVICE_ALT_ADDRESS_INFO = BluetoothServiceInfoBleak(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
),
device=BLEDevice("aa:bb:cc:dd:ee:ff", "WoHand"),
time=0,
connectable=True,
)
WOCURTAIN_SERVICE_INFO = BluetoothServiceInfoBleak(
name="WoCurtain",
@ -121,6 +147,8 @@ WOCURTAIN_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
),
device=BLEDevice("aa:bb:cc:dd:ee:ff", "WoCurtain"),
time=0,
connectable=True,
)
WOSENSORTH_SERVICE_INFO = BluetoothServiceInfoBleak(
@ -136,6 +164,8 @@ WOSENSORTH_SERVICE_INFO = BluetoothServiceInfoBleak(
service_data={"0000fd3d-0000-1000-8000-00805f9b34fb": b"T\x00d\x00\x96\xac"},
),
device=BLEDevice("aa:bb:cc:dd:ee:ff", "WoSensorTH"),
time=0,
connectable=False,
)
NOT_SWITCHBOT_INFO = BluetoothServiceInfoBleak(
@ -151,4 +181,6 @@ NOT_SWITCHBOT_INFO = BluetoothServiceInfoBleak(
service_data={},
),
device=BLEDevice("aa:bb:cc:dd:ee:ff", "unknown"),
time=0,
connectable=True,
)

View File

@ -14,6 +14,7 @@ from . import (
WOHAND_ENCRYPTED_SERVICE_INFO,
WOHAND_SERVICE_ALT_ADDRESS_INFO,
WOHAND_SERVICE_INFO,
WOHAND_SERVICE_INFO_NOT_CONNECTABLE,
WOSENSORTH_SERVICE_INFO,
init_integration,
patch_async_setup_entry,
@ -112,6 +113,17 @@ async def test_async_step_bluetooth_not_switchbot(hass):
assert result["reason"] == "not_supported"
async def test_async_step_bluetooth_not_connectable(hass):
"""Test discovery via bluetooth and its not connectable switchbot."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_BLUETOOTH},
data=WOHAND_SERVICE_INFO_NOT_CONNECTABLE,
)
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "not_supported"
async def test_user_setup_wohand(hass):
"""Test the user initiated form with password and valid mac."""
@ -203,7 +215,12 @@ async def test_user_setup_wocurtain_or_bot(hass):
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOCURTAIN_SERVICE_INFO, WOHAND_SERVICE_ALT_ADDRESS_INFO],
return_value=[
NOT_SWITCHBOT_INFO,
WOCURTAIN_SERVICE_INFO,
WOHAND_SERVICE_ALT_ADDRESS_INFO,
WOHAND_SERVICE_INFO_NOT_CONNECTABLE,
],
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
@ -234,7 +251,11 @@ async def test_user_setup_wocurtain_or_bot_with_password(hass):
with patch(
"homeassistant.components.switchbot.config_flow.async_discovered_service_info",
return_value=[WOCURTAIN_SERVICE_INFO, WOHAND_ENCRYPTED_SERVICE_INFO],
return_value=[
WOCURTAIN_SERVICE_INFO,
WOHAND_ENCRYPTED_SERVICE_INFO,
WOHAND_SERVICE_INFO_NOT_CONNECTABLE,
],
):
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}

View File

@ -15,6 +15,8 @@ NOT_SENSOR_PUSH_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=[],
source="local",
advertisement=AdvertisementData(local_name="Not it"),
time=0,
connectable=False,
)
LYWSDCGQ_SERVICE_INFO = BluetoothServiceInfoBleak(
@ -29,6 +31,8 @@ LYWSDCGQ_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
source="local",
advertisement=AdvertisementData(local_name="Not it"),
time=0,
connectable=False,
)
MMC_T201_1_SERVICE_INFO = BluetoothServiceInfoBleak(
@ -43,6 +47,8 @@ MMC_T201_1_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
source="local",
advertisement=AdvertisementData(local_name="Not it"),
time=0,
connectable=False,
)
JTYJGD03MI_SERVICE_INFO = BluetoothServiceInfoBleak(
@ -57,6 +63,8 @@ JTYJGD03MI_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
source="local",
advertisement=AdvertisementData(local_name="Not it"),
time=0,
connectable=False,
)
YLKG07YL_SERVICE_INFO = BluetoothServiceInfoBleak(
@ -71,6 +79,8 @@ YLKG07YL_SERVICE_INFO = BluetoothServiceInfoBleak(
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
source="local",
advertisement=AdvertisementData(local_name="Not it"),
time=0,
connectable=False,
)
MISSING_PAYLOAD_ENCRYPTED = BluetoothServiceInfoBleak(
@ -85,10 +95,14 @@ MISSING_PAYLOAD_ENCRYPTED = BluetoothServiceInfoBleak(
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
source="local",
advertisement=AdvertisementData(local_name="Not it"),
time=0,
connectable=False,
)
def make_advertisement(address: str, payload: bytes) -> BluetoothServiceInfoBleak:
def make_advertisement(
address: str, payload: bytes, connectable: bool = True
) -> BluetoothServiceInfoBleak:
"""Make a dummy advertisement."""
return BluetoothServiceInfoBleak(
name="Test Device",
@ -102,4 +116,6 @@ def make_advertisement(address: str, payload: bytes) -> BluetoothServiceInfoBlea
service_uuids=["0000fe95-0000-1000-8000-00805f9b34fb"],
source="local",
advertisement=AdvertisementData(local_name="Test Device"),
time=0,
connectable=connectable,
)

View File

@ -2,7 +2,10 @@
from unittest.mock import patch
from homeassistant.components.bluetooth import BluetoothChange
from homeassistant.components.bluetooth import (
BluetoothChange,
async_get_advertisement_callback,
)
from homeassistant.components.sensor import ATTR_STATE_CLASS
from homeassistant.components.xiaomi_ble.const import DOMAIN
from homeassistant.const import ATTR_FRIENDLY_NAME, ATTR_UNIT_OF_MEASUREMENT
@ -294,6 +297,190 @@ async def test_xiaomi_HHCCJCY01(hass):
await hass.async_block_till_done()
async def test_xiaomi_HHCCJCY01_not_connectable(hass):
"""This device has multiple advertisements before all sensors are visible but not connectable."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id="C4:7C:8D:6A:3E:7B",
)
entry.add_to_hass(hass)
saved_callback = None
def _async_register_callback(_hass, _callback, _matcher, _mode):
nonlocal saved_callback
saved_callback = _callback
return lambda: None
with patch(
"homeassistant.components.bluetooth.update_coordinator.async_register_callback",
_async_register_callback,
):
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 0
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00fz>j\x8d|\xc4\r\x07\x10\x03\x00\x00\x00",
connectable=False,
),
BluetoothChange.ADVERTISEMENT,
)
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00hz>j\x8d|\xc4\r\t\x10\x02W\x02",
connectable=False,
),
BluetoothChange.ADVERTISEMENT,
)
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00Gz>j\x8d|\xc4\r\x08\x10\x01@",
connectable=False,
),
BluetoothChange.ADVERTISEMENT,
)
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00iz>j\x8d|\xc4\r\x04\x10\x02\xf4\x00",
connectable=False,
),
BluetoothChange.ADVERTISEMENT,
)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 4
illum_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_illuminance")
illum_sensor_attr = illum_sensor.attributes
assert illum_sensor.state == "0"
assert illum_sensor_attr[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Illuminance"
assert illum_sensor_attr[ATTR_UNIT_OF_MEASUREMENT] == "lx"
assert illum_sensor_attr[ATTR_STATE_CLASS] == "measurement"
cond_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_conductivity")
cond_sensor_attribtes = cond_sensor.attributes
assert cond_sensor.state == "599"
assert (
cond_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Conductivity"
)
assert cond_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "µS/cm"
assert cond_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
moist_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_moisture")
moist_sensor_attribtes = moist_sensor.attributes
assert moist_sensor.state == "64"
assert moist_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Moisture"
assert moist_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "%"
assert moist_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
temp_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_temperature")
temp_sensor_attribtes = temp_sensor.attributes
assert temp_sensor.state == "24.4"
assert (
temp_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Temperature"
)
assert temp_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "°C"
assert temp_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
# No battery sensor since its not connectable
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
async def test_xiaomi_HHCCJCY01_only_some_sources_connectable(hass):
"""This device has multiple advertisements before all sensors are visible and some sources are connectable."""
entry = MockConfigEntry(
domain=DOMAIN,
unique_id="C4:7C:8D:6A:3E:7A",
)
entry.add_to_hass(hass)
saved_callback = async_get_advertisement_callback(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 0
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00fz>j\x8d|\xc4\r\x07\x10\x03\x00\x00\x00",
connectable=True,
),
)
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00hz>j\x8d|\xc4\r\t\x10\x02W\x02",
connectable=False,
),
)
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00Gz>j\x8d|\xc4\r\x08\x10\x01@",
connectable=False,
),
)
saved_callback(
make_advertisement(
"C4:7C:8D:6A:3E:7A",
b"q \x98\x00iz>j\x8d|\xc4\r\x04\x10\x02\xf4\x00",
connectable=False,
),
)
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 5
illum_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_illuminance")
illum_sensor_attr = illum_sensor.attributes
assert illum_sensor.state == "0"
assert illum_sensor_attr[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Illuminance"
assert illum_sensor_attr[ATTR_UNIT_OF_MEASUREMENT] == "lx"
assert illum_sensor_attr[ATTR_STATE_CLASS] == "measurement"
cond_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_conductivity")
cond_sensor_attribtes = cond_sensor.attributes
assert cond_sensor.state == "599"
assert (
cond_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Conductivity"
)
assert cond_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "µS/cm"
assert cond_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
moist_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_moisture")
moist_sensor_attribtes = moist_sensor.attributes
assert moist_sensor.state == "64"
assert moist_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Moisture"
assert moist_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "%"
assert moist_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
temp_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_temperature")
temp_sensor_attribtes = temp_sensor.attributes
assert temp_sensor.state == "24.4"
assert (
temp_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Temperature"
)
assert temp_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "°C"
assert temp_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
batt_sensor = hass.states.get("sensor.plant_sensor_6a3e7a_battery")
batt_sensor_attribtes = batt_sensor.attributes
assert batt_sensor.state == "5"
assert batt_sensor_attribtes[ATTR_FRIENDLY_NAME] == "Plant Sensor 6A3E7A Battery"
assert batt_sensor_attribtes[ATTR_UNIT_OF_MEASUREMENT] == "%"
assert batt_sensor_attribtes[ATTR_STATE_CLASS] == "measurement"
assert await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
async def test_xiaomi_CGDK2(hass):
"""This device has encrypion so we need to retrieve its bindkey from the configentry."""
entry = MockConfigEntry(

View File

@ -17,6 +17,8 @@ YALE_ACCESS_LOCK_DISCOVERY_INFO = BluetoothServiceInfoBleak(
source="local",
device=BLEDevice(address="AA:BB:CC:DD:EE:FF", name="M1012LU"),
advertisement=AdvertisementData(),
time=0,
connectable=True,
)
@ -33,6 +35,8 @@ LOCK_DISCOVERY_INFO_UUID_ADDRESS = BluetoothServiceInfoBleak(
source="local",
device=BLEDevice(address="AA:BB:CC:DD:EE:FF", name="M1012LU"),
advertisement=AdvertisementData(),
time=0,
connectable=True,
)
OLD_FIRMWARE_LOCK_DISCOVERY_INFO = BluetoothServiceInfoBleak(
@ -48,6 +52,8 @@ OLD_FIRMWARE_LOCK_DISCOVERY_INFO = BluetoothServiceInfoBleak(
source="local",
device=BLEDevice(address="AA:BB:CC:DD:EE:FF", name="Aug"),
advertisement=AdvertisementData(),
time=0,
connectable=True,
)
@ -64,4 +70,6 @@ NOT_YALE_DISCOVERY_INFO = BluetoothServiceInfoBleak(
source="local",
device=BLEDevice(address="AA:BB:CC:DD:EE:FF", name="Aug"),
advertisement=AdvertisementData(),
time=0,
connectable=True,
)