Split bluetooth models into base_scanner and wrappers (#82291)

This commit is contained in:
J. Nick Koston 2022-11-20 08:44:28 -06:00 committed by GitHub
parent 76cb1c4978
commit 7801cd96de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 821 additions and 792 deletions

View File

@ -20,6 +20,7 @@ from bluetooth_adapters import (
adapter_unique_name,
get_adapters,
)
from home_assistant_bluetooth import BluetoothServiceInfo, BluetoothServiceInfoBleak
from homeassistant.components import usb
from homeassistant.config_entries import (
@ -41,6 +42,7 @@ from homeassistant.helpers.issue_registry import (
from homeassistant.loader import async_get_bluetooth
from . import models
from .base_scanner import BaseHaRemoteScanner, BaseHaScanner
from .const import (
BLUETOOTH_DISCOVERY_COOLDOWN_SECONDS,
CONF_ADAPTER,
@ -55,18 +57,13 @@ from .const import (
from .manager import BluetoothManager
from .match import BluetoothCallbackMatcher, IntegrationMatcher
from .models import (
BaseHaRemoteScanner,
BaseHaScanner,
BluetoothCallback,
BluetoothChange,
BluetoothScanningMode,
BluetoothServiceInfo,
BluetoothServiceInfoBleak,
HaBleakScannerWrapper,
HaBluetoothConnector,
ProcessAdvertisementCallback,
)
from .scanner import HaScanner, ScannerStartError
from .wrappers import HaBleakScannerWrapper, HaBluetoothConnector
if TYPE_CHECKING:
from bleak.backends.device import BLEDevice

View File

@ -0,0 +1,195 @@
"""Base classes for HA Bluetooth scanners for bluetooth."""
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Callable
import datetime
from datetime import timedelta
from typing import Any, Final
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak_retry_connector import NO_RSSI_VALUE
from home_assistant_bluetooth import BluetoothServiceInfoBleak
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.dt import monotonic_time_coarse
from .const import (
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
)
from .models import HaBluetoothConnector
MONOTONIC_TIME: Final = monotonic_time_coarse
class BaseHaScanner:
"""Base class for Ha Scanners."""
def __init__(self, hass: HomeAssistant, source: str) -> None:
"""Initialize the scanner."""
self.hass = hass
self.source = source
@property
@abstractmethod
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
@property
@abstractmethod
def discovered_devices_and_advertisement_data(
self,
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
"""Return a list of discovered devices and their advertisement data."""
async def async_diagnostics(self) -> dict[str, Any]:
"""Return diagnostic information about the scanner."""
return {
"type": self.__class__.__name__,
"discovered_devices": [
{
"name": device.name,
"address": device.address,
}
for device in self.discovered_devices
],
}
class BaseHaRemoteScanner(BaseHaScanner):
"""Base class for a Home Assistant remote BLE scanner."""
def __init__(
self,
hass: HomeAssistant,
scanner_id: str,
new_info_callback: Callable[[BluetoothServiceInfoBleak], None],
connector: HaBluetoothConnector,
connectable: bool,
) -> None:
"""Initialize the scanner."""
super().__init__(hass, scanner_id)
self._new_info_callback = new_info_callback
self._discovered_device_advertisement_datas: dict[
str, tuple[BLEDevice, AdvertisementData]
] = {}
self._discovered_device_timestamps: dict[str, float] = {}
self._connector = connector
self._connectable = connectable
self._details: dict[str, str | HaBluetoothConnector] = {"source": scanner_id}
self._expire_seconds = FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
if connectable:
self._details["connector"] = connector
self._expire_seconds = (
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
)
@hass_callback
def async_setup(self) -> CALLBACK_TYPE:
"""Set up the scanner."""
return async_track_time_interval(
self.hass, self._async_expire_devices, timedelta(seconds=30)
)
def _async_expire_devices(self, _datetime: datetime.datetime) -> None:
"""Expire old devices."""
now = MONOTONIC_TIME()
expired = [
address
for address, timestamp in self._discovered_device_timestamps.items()
if now - timestamp > self._expire_seconds
]
for address in expired:
del self._discovered_device_advertisement_datas[address]
del self._discovered_device_timestamps[address]
@property
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
return [
device_advertisement_data[0]
for device_advertisement_data in self._discovered_device_advertisement_datas.values()
]
@property
def discovered_devices_and_advertisement_data(
self,
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
"""Return a list of discovered devices and advertisement data."""
return self._discovered_device_advertisement_datas
@hass_callback
def _async_on_advertisement(
self,
address: str,
rssi: int,
local_name: str | None,
service_uuids: list[str],
service_data: dict[str, bytes],
manufacturer_data: dict[int, bytes],
tx_power: int | None,
) -> None:
"""Call the registered callback."""
now = MONOTONIC_TIME()
if prev_discovery := self._discovered_device_advertisement_datas.get(address):
# Merge the new data with the old data
# to function the same as BlueZ which
# merges the dicts on PropertiesChanged
prev_device = prev_discovery[0]
prev_advertisement = prev_discovery[1]
if (
local_name
and prev_device.name
and len(prev_device.name) > len(local_name)
):
local_name = prev_device.name
if prev_advertisement.service_uuids:
service_uuids = list(
set(service_uuids + prev_advertisement.service_uuids)
)
if prev_advertisement.service_data:
service_data = {**prev_advertisement.service_data, **service_data}
if prev_advertisement.manufacturer_data:
manufacturer_data = {
**prev_advertisement.manufacturer_data,
**manufacturer_data,
}
advertisement_data = AdvertisementData(
local_name=None if local_name == "" else local_name,
manufacturer_data=manufacturer_data,
service_data=service_data,
service_uuids=service_uuids,
rssi=rssi,
tx_power=NO_RSSI_VALUE if tx_power is None else tx_power,
platform_data=(),
)
device = BLEDevice( # type: ignore[no-untyped-call]
address=address,
name=local_name,
details=self._details,
rssi=rssi, # deprecated, will be removed in newer bleak
)
self._discovered_device_advertisement_datas[address] = (
device,
advertisement_data,
)
self._discovered_device_timestamps[address] = now
self._new_info_callback(
BluetoothServiceInfoBleak(
name=advertisement_data.local_name or device.name or device.address,
address=device.address,
rssi=rssi,
manufacturer_data=advertisement_data.manufacturer_data,
service_data=advertisement_data.service_data,
service_uuids=advertisement_data.service_uuids,
source=self.source,
device=device,
advertisement=advertisement_data,
connectable=self._connectable,
time=now,
)
)

View File

@ -31,6 +31,15 @@ START_TIMEOUT = 15
#
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS: Final = 60 * 15
# The maximum time between advertisements for a device to be considered
# stale when the advertisement tracker can determine the interval for
# connectable devices.
#
# BlueZ uses 180 seconds by default but we give it a bit more time
# to account for the esp32's bluetooth stack being a bit slower
# than BlueZ's.
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS: Final = 195
# We must recover before we hit the 180s mark
# where the device is removed from the stack

View File

@ -29,6 +29,7 @@ from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.dt import monotonic_time_coarse
from .advertisement_tracker import AdvertisementTracker
from .base_scanner import BaseHaScanner
from .const import (
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
UNAVAILABLE_TRACK_SECONDS,
@ -43,12 +44,7 @@ from .match import (
IntegrationMatcher,
ble_device_matches,
)
from .models import (
BaseHaScanner,
BluetoothCallback,
BluetoothChange,
BluetoothServiceInfoBleak,
)
from .models import BluetoothCallback, BluetoothChange, BluetoothServiceInfoBleak
from .usage import install_multiple_bleak_catcher, uninstall_multiple_bleak_catcher
from .util import async_load_history_from_system

View File

@ -1,61 +1,35 @@
"""Models for bluetooth."""
from __future__ import annotations
from abc import abstractmethod
import asyncio
from collections.abc import Callable
import contextlib
from dataclasses import dataclass
import datetime
from datetime import timedelta
from enum import Enum
import logging
from typing import TYPE_CHECKING, Any, Final
from typing import TYPE_CHECKING, Final
from bleak import BleakClient, BleakError
from bleak.backends.client import BaseBleakClient, get_platform_client_backend_type
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import (
AdvertisementData,
AdvertisementDataCallback,
BaseBleakScanner,
)
from bleak_retry_connector import NO_RSSI_VALUE, freshen_ble_device
from bleak import BaseBleakClient
from home_assistant_bluetooth import BluetoothServiceInfoBleak
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.frame import report
from homeassistant.helpers.service_info.bluetooth import ( # noqa: F401 # pylint: disable=unused-import
BluetoothServiceInfo,
)
from homeassistant.util.dt import monotonic_time_coarse
from .const import FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
# The maximum time between advertisements for a device to be considered
# stale when the advertisement tracker can determine the interval for
# connectable devices.
#
# BlueZ uses 180 seconds by default but we give it a bit more time
# to account for the esp32's bluetooth stack being a bit slower
# than BlueZ's.
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS: Final = 195
if TYPE_CHECKING:
from .manager import BluetoothManager
_LOGGER = logging.getLogger(__name__)
FILTER_UUIDS: Final = "UUIDs"
MANAGER: BluetoothManager | None = None
MONOTONIC_TIME: Final = monotonic_time_coarse
@dataclass
class HaBluetoothConnector:
"""Data for how to connect a BLEDevice from a given scanner."""
client: type[BaseBleakClient]
source: str
can_connect: Callable[[], bool]
class BluetoothScanningMode(Enum):
"""The mode of scanning for bluetooth devices."""
@ -66,434 +40,3 @@ class BluetoothScanningMode(Enum):
BluetoothChange = Enum("BluetoothChange", "ADVERTISEMENT")
BluetoothCallback = Callable[[BluetoothServiceInfoBleak, BluetoothChange], None]
ProcessAdvertisementCallback = Callable[[BluetoothServiceInfoBleak], bool]
@dataclass
class HaBluetoothConnector:
"""Data for how to connect a BLEDevice from a given scanner."""
client: type[BaseBleakClient]
source: str
can_connect: Callable[[], bool]
@dataclass
class _HaWrappedBleakBackend:
"""Wrap bleak backend to make it usable by Home Assistant."""
device: BLEDevice
client: type[BaseBleakClient]
class BaseHaScanner:
"""Base class for Ha Scanners."""
def __init__(self, hass: HomeAssistant, source: str) -> None:
"""Initialize the scanner."""
self.hass = hass
self.source = source
@property
@abstractmethod
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
@property
@abstractmethod
def discovered_devices_and_advertisement_data(
self,
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
"""Return a list of discovered devices and their advertisement data."""
async def async_diagnostics(self) -> dict[str, Any]:
"""Return diagnostic information about the scanner."""
return {
"type": self.__class__.__name__,
"discovered_devices": [
{
"name": device.name,
"address": device.address,
}
for device in self.discovered_devices
],
}
class BaseHaRemoteScanner(BaseHaScanner):
"""Base class for a Home Assistant remote BLE scanner."""
def __init__(
self,
hass: HomeAssistant,
scanner_id: str,
new_info_callback: Callable[[BluetoothServiceInfoBleak], None],
connector: HaBluetoothConnector,
connectable: bool,
) -> None:
"""Initialize the scanner."""
super().__init__(hass, scanner_id)
self._new_info_callback = new_info_callback
self._discovered_device_advertisement_datas: dict[
str, tuple[BLEDevice, AdvertisementData]
] = {}
self._discovered_device_timestamps: dict[str, float] = {}
self._connector = connector
self._connectable = connectable
self._details: dict[str, str | HaBluetoothConnector] = {"source": scanner_id}
self._expire_seconds = FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
if connectable:
self._details["connector"] = connector
self._expire_seconds = (
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
)
@hass_callback
def async_setup(self) -> CALLBACK_TYPE:
"""Set up the scanner."""
return async_track_time_interval(
self.hass, self._async_expire_devices, timedelta(seconds=30)
)
def _async_expire_devices(self, _datetime: datetime.datetime) -> None:
"""Expire old devices."""
now = MONOTONIC_TIME()
expired = [
address
for address, timestamp in self._discovered_device_timestamps.items()
if now - timestamp > self._expire_seconds
]
for address in expired:
del self._discovered_device_advertisement_datas[address]
del self._discovered_device_timestamps[address]
@property
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
return [
device_advertisement_data[0]
for device_advertisement_data in self._discovered_device_advertisement_datas.values()
]
@property
def discovered_devices_and_advertisement_data(
self,
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
"""Return a list of discovered devices and advertisement data."""
return self._discovered_device_advertisement_datas
@hass_callback
def _async_on_advertisement(
self,
address: str,
rssi: int,
local_name: str | None,
service_uuids: list[str],
service_data: dict[str, bytes],
manufacturer_data: dict[int, bytes],
tx_power: int | None,
) -> None:
"""Call the registered callback."""
now = MONOTONIC_TIME()
if prev_discovery := self._discovered_device_advertisement_datas.get(address):
# Merge the new data with the old data
# to function the same as BlueZ which
# merges the dicts on PropertiesChanged
prev_device = prev_discovery[0]
prev_advertisement = prev_discovery[1]
if (
local_name
and prev_device.name
and len(prev_device.name) > len(local_name)
):
local_name = prev_device.name
if prev_advertisement.service_uuids:
service_uuids = list(
set(service_uuids + prev_advertisement.service_uuids)
)
if prev_advertisement.service_data:
service_data = {**prev_advertisement.service_data, **service_data}
if prev_advertisement.manufacturer_data:
manufacturer_data = {
**prev_advertisement.manufacturer_data,
**manufacturer_data,
}
advertisement_data = AdvertisementData(
local_name=None if local_name == "" else local_name,
manufacturer_data=manufacturer_data,
service_data=service_data,
service_uuids=service_uuids,
rssi=rssi,
tx_power=NO_RSSI_VALUE if tx_power is None else tx_power,
platform_data=(),
)
device = BLEDevice( # type: ignore[no-untyped-call]
address=address,
name=local_name,
details=self._details,
rssi=rssi, # deprecated, will be removed in newer bleak
)
self._discovered_device_advertisement_datas[address] = (
device,
advertisement_data,
)
self._discovered_device_timestamps[address] = now
self._new_info_callback(
BluetoothServiceInfoBleak(
name=advertisement_data.local_name or device.name or device.address,
address=device.address,
rssi=rssi,
manufacturer_data=advertisement_data.manufacturer_data,
service_data=advertisement_data.service_data,
service_uuids=advertisement_data.service_uuids,
source=self.source,
device=device,
advertisement=advertisement_data,
connectable=self._connectable,
time=now,
)
)
class HaBleakScannerWrapper(BaseBleakScanner):
"""A wrapper that uses the single instance."""
def __init__(
self,
*args: Any,
detection_callback: AdvertisementDataCallback | None = None,
service_uuids: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Initialize the BleakScanner."""
self._detection_cancel: CALLBACK_TYPE | None = None
self._mapped_filters: dict[str, set[str]] = {}
self._advertisement_data_callback: AdvertisementDataCallback | None = None
remapped_kwargs = {
"detection_callback": detection_callback,
"service_uuids": service_uuids or [],
**kwargs,
}
self._map_filters(*args, **remapped_kwargs)
super().__init__(
detection_callback=detection_callback, service_uuids=service_uuids or []
)
@classmethod
async def discover(cls, timeout: float = 5.0, **kwargs: Any) -> list[BLEDevice]:
"""Discover devices."""
assert MANAGER is not None
return list(MANAGER.async_discovered_devices(True))
async def stop(self, *args: Any, **kwargs: Any) -> None:
"""Stop scanning for devices."""
async def start(self, *args: Any, **kwargs: Any) -> None:
"""Start scanning for devices."""
def _map_filters(self, *args: Any, **kwargs: Any) -> bool:
"""Map the filters."""
mapped_filters = {}
if filters := kwargs.get("filters"):
if filter_uuids := filters.get(FILTER_UUIDS):
mapped_filters[FILTER_UUIDS] = set(filter_uuids)
else:
_LOGGER.warning("Only %s filters are supported", FILTER_UUIDS)
if service_uuids := kwargs.get("service_uuids"):
mapped_filters[FILTER_UUIDS] = set(service_uuids)
if mapped_filters == self._mapped_filters:
return False
self._mapped_filters = mapped_filters
return True
def set_scanning_filter(self, *args: Any, **kwargs: Any) -> None:
"""Set the filters to use."""
if self._map_filters(*args, **kwargs):
self._setup_detection_callback()
def _cancel_callback(self) -> None:
"""Cancel callback."""
if self._detection_cancel:
self._detection_cancel()
self._detection_cancel = None
@property
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
assert MANAGER is not None
return list(MANAGER.async_discovered_devices(True))
def register_detection_callback(
self, callback: AdvertisementDataCallback | None
) -> None:
"""Register a callback that is called when a device is discovered or has a property changed.
This method takes the callback and registers it with the long running
scanner.
"""
self._advertisement_data_callback = callback
self._setup_detection_callback()
def _setup_detection_callback(self) -> None:
"""Set up the detection callback."""
if self._advertisement_data_callback is None:
return
self._cancel_callback()
super().register_detection_callback(self._advertisement_data_callback)
assert MANAGER is not None
assert self._callback is not None
self._detection_cancel = MANAGER.async_register_bleak_callback(
self._callback, self._mapped_filters
)
def __del__(self) -> None:
"""Delete the BleakScanner."""
if self._detection_cancel:
# Nothing to do if event loop is already closed
with contextlib.suppress(RuntimeError):
asyncio.get_running_loop().call_soon_threadsafe(self._detection_cancel)
class HaBleakClientWrapper(BleakClient):
"""Wrap the BleakClient to ensure it does not shutdown our scanner.
If an address is passed into BleakClient instead of a BLEDevice,
bleak will quietly start a new scanner under the hood to resolve
the address. This can cause a conflict with our scanner. We need
to handle translating the address to the BLEDevice in this case
to avoid the whole stack from getting stuck in an in progress state
when an integration does this.
"""
def __init__( # pylint: disable=super-init-not-called, keyword-arg-before-vararg
self,
address_or_ble_device: str | BLEDevice,
disconnected_callback: Callable[[BleakClient], None] | None = None,
*args: Any,
timeout: float = 10.0,
**kwargs: Any,
) -> None:
"""Initialize the BleakClient."""
if isinstance(address_or_ble_device, BLEDevice):
self.__address = address_or_ble_device.address
else:
report(
"attempted to call BleakClient with an address instead of a BLEDevice",
exclude_integrations={"bluetooth"},
error_if_core=False,
)
self.__address = address_or_ble_device
self.__disconnected_callback = disconnected_callback
self.__timeout = timeout
self.__ble_device: BLEDevice | None = None
self._backend: BaseBleakClient | None = None # type: ignore[assignment]
@property
def is_connected(self) -> bool:
"""Return True if the client is connected to a device."""
return self._backend is not None and self._backend.is_connected
def set_disconnected_callback(
self,
callback: Callable[[BleakClient], None] | None,
**kwargs: Any,
) -> None:
"""Set the disconnect callback."""
self.__disconnected_callback = callback
if self._backend:
self._backend.set_disconnected_callback(callback, **kwargs) # type: ignore[arg-type]
async def connect(self, **kwargs: Any) -> bool:
"""Connect to the specified GATT server."""
if (
not self._backend
or not self.__ble_device
or not self._async_get_backend_for_ble_device(self.__ble_device)
):
assert MANAGER is not None
wrapped_backend = (
self._async_get_backend() or self._async_get_fallback_backend()
)
self.__ble_device = (
await freshen_ble_device(wrapped_backend.device)
or wrapped_backend.device
)
self._backend = wrapped_backend.client(
self.__ble_device,
disconnected_callback=self.__disconnected_callback,
timeout=self.__timeout,
hass=MANAGER.hass,
)
return await super().connect(**kwargs)
@hass_callback
def _async_get_backend_for_ble_device(
self, ble_device: BLEDevice
) -> _HaWrappedBleakBackend | None:
"""Get the backend for a BLEDevice."""
details = ble_device.details
if not isinstance(details, dict) or "connector" not in details:
# If client is not defined in details
# its the client for this platform
cls = get_platform_client_backend_type()
return _HaWrappedBleakBackend(ble_device, cls)
connector: HaBluetoothConnector = details["connector"]
# Make sure the backend can connect to the device
# as some backends have connection limits
if not connector.can_connect():
return None
return _HaWrappedBleakBackend(ble_device, connector.client)
@hass_callback
def _async_get_backend(self) -> _HaWrappedBleakBackend | None:
"""Get the bleak backend for the given address."""
assert MANAGER is not None
address = self.__address
ble_device = MANAGER.async_ble_device_from_address(address, True)
if ble_device is None:
raise BleakError(f"No device found for address {address}")
if backend := self._async_get_backend_for_ble_device(ble_device):
return backend
return None
@hass_callback
def _async_get_fallback_backend(self) -> _HaWrappedBleakBackend:
"""Get a fallback backend for the given address."""
#
# The preferred backend cannot currently connect the device
# because it is likely out of connection slots.
#
# We need to try all backends to find one that can
# connect to the device.
#
assert MANAGER is not None
address = self.__address
device_advertisement_datas = (
MANAGER.async_get_discovered_devices_and_advertisement_data_by_address(
address, True
)
)
for device_advertisement_data in sorted(
device_advertisement_datas,
key=lambda device_advertisement_data: device_advertisement_data[1].rssi
or NO_RSSI_VALUE,
reverse=True,
):
if backend := self._async_get_backend_for_ble_device(
device_advertisement_data[0]
):
return backend
raise BleakError(
f"No backend with an available connection slot that can reach address {address} was found"
)
async def disconnect(self) -> bool:
"""Disconnect from the device."""
if self._backend is None:
return True
return await self._backend.disconnect()

View File

@ -25,13 +25,14 @@ from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.dt import monotonic_time_coarse
from homeassistant.util.package import is_docker_env
from .base_scanner import BaseHaScanner
from .const import (
SCANNER_WATCHDOG_INTERVAL,
SCANNER_WATCHDOG_TIMEOUT,
SOURCE_LOCAL,
START_TIMEOUT,
)
from .models import BaseHaScanner, BluetoothScanningMode, BluetoothServiceInfoBleak
from .models import BluetoothScanningMode, BluetoothServiceInfoBleak
from .util import async_reset_adapter
OriginalBleakScanner = bleak.BleakScanner

View File

@ -6,7 +6,7 @@ import bleak
from bleak.backends.service import BleakGATTServiceCollection
import bleak_retry_connector
from .models import HaBleakClientWrapper, HaBleakScannerWrapper
from .wrappers import HaBleakClientWrapper, HaBleakScannerWrapper
ORIGINAL_BLEAK_SCANNER = bleak.BleakScanner
ORIGINAL_BLEAK_CLIENT = bleak.BleakClient

View File

@ -0,0 +1,274 @@
"""Bleak wrappers for bluetooth."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
import contextlib
from dataclasses import dataclass
import logging
from typing import Any, Final
from bleak import BleakClient, BleakError
from bleak.backends.client import BaseBleakClient, get_platform_client_backend_type
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementDataCallback, BaseBleakScanner
from bleak_retry_connector import NO_RSSI_VALUE, freshen_ble_device
from homeassistant.core import CALLBACK_TYPE, callback as hass_callback
from homeassistant.helpers.frame import report
from . import models
from .models import HaBluetoothConnector
FILTER_UUIDS: Final = "UUIDs"
_LOGGER = logging.getLogger(__name__)
@dataclass
class _HaWrappedBleakBackend:
"""Wrap bleak backend to make it usable by Home Assistant."""
device: BLEDevice
client: type[BaseBleakClient]
class HaBleakScannerWrapper(BaseBleakScanner):
"""A wrapper that uses the single instance."""
def __init__(
self,
*args: Any,
detection_callback: AdvertisementDataCallback | None = None,
service_uuids: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Initialize the BleakScanner."""
self._detection_cancel: CALLBACK_TYPE | None = None
self._mapped_filters: dict[str, set[str]] = {}
self._advertisement_data_callback: AdvertisementDataCallback | None = None
remapped_kwargs = {
"detection_callback": detection_callback,
"service_uuids": service_uuids or [],
**kwargs,
}
self._map_filters(*args, **remapped_kwargs)
super().__init__(
detection_callback=detection_callback, service_uuids=service_uuids or []
)
@classmethod
async def discover(cls, timeout: float = 5.0, **kwargs: Any) -> list[BLEDevice]:
"""Discover devices."""
assert models.MANAGER is not None
return list(models.MANAGER.async_discovered_devices(True))
async def stop(self, *args: Any, **kwargs: Any) -> None:
"""Stop scanning for devices."""
async def start(self, *args: Any, **kwargs: Any) -> None:
"""Start scanning for devices."""
def _map_filters(self, *args: Any, **kwargs: Any) -> bool:
"""Map the filters."""
mapped_filters = {}
if filters := kwargs.get("filters"):
if filter_uuids := filters.get(FILTER_UUIDS):
mapped_filters[FILTER_UUIDS] = set(filter_uuids)
else:
_LOGGER.warning("Only %s filters are supported", FILTER_UUIDS)
if service_uuids := kwargs.get("service_uuids"):
mapped_filters[FILTER_UUIDS] = set(service_uuids)
if mapped_filters == self._mapped_filters:
return False
self._mapped_filters = mapped_filters
return True
def set_scanning_filter(self, *args: Any, **kwargs: Any) -> None:
"""Set the filters to use."""
if self._map_filters(*args, **kwargs):
self._setup_detection_callback()
def _cancel_callback(self) -> None:
"""Cancel callback."""
if self._detection_cancel:
self._detection_cancel()
self._detection_cancel = None
@property
def discovered_devices(self) -> list[BLEDevice]:
"""Return a list of discovered devices."""
assert models.MANAGER is not None
return list(models.MANAGER.async_discovered_devices(True))
def register_detection_callback(
self, callback: AdvertisementDataCallback | None
) -> None:
"""Register a callback that is called when a device is discovered or has a property changed.
This method takes the callback and registers it with the long running
scanner.
"""
self._advertisement_data_callback = callback
self._setup_detection_callback()
def _setup_detection_callback(self) -> None:
"""Set up the detection callback."""
if self._advertisement_data_callback is None:
return
self._cancel_callback()
super().register_detection_callback(self._advertisement_data_callback)
assert models.MANAGER is not None
assert self._callback is not None
self._detection_cancel = models.MANAGER.async_register_bleak_callback(
self._callback, self._mapped_filters
)
def __del__(self) -> None:
"""Delete the BleakScanner."""
if self._detection_cancel:
# Nothing to do if event loop is already closed
with contextlib.suppress(RuntimeError):
asyncio.get_running_loop().call_soon_threadsafe(self._detection_cancel)
class HaBleakClientWrapper(BleakClient):
"""Wrap the BleakClient to ensure it does not shutdown our scanner.
If an address is passed into BleakClient instead of a BLEDevice,
bleak will quietly start a new scanner under the hood to resolve
the address. This can cause a conflict with our scanner. We need
to handle translating the address to the BLEDevice in this case
to avoid the whole stack from getting stuck in an in progress state
when an integration does this.
"""
def __init__( # pylint: disable=super-init-not-called, keyword-arg-before-vararg
self,
address_or_ble_device: str | BLEDevice,
disconnected_callback: Callable[[BleakClient], None] | None = None,
*args: Any,
timeout: float = 10.0,
**kwargs: Any,
) -> None:
"""Initialize the BleakClient."""
if isinstance(address_or_ble_device, BLEDevice):
self.__address = address_or_ble_device.address
else:
report(
"attempted to call BleakClient with an address instead of a BLEDevice",
exclude_integrations={"bluetooth"},
error_if_core=False,
)
self.__address = address_or_ble_device
self.__disconnected_callback = disconnected_callback
self.__timeout = timeout
self.__ble_device: BLEDevice | None = None
self._backend: BaseBleakClient | None = None # type: ignore[assignment]
@property
def is_connected(self) -> bool:
"""Return True if the client is connected to a device."""
return self._backend is not None and self._backend.is_connected
def set_disconnected_callback(
self,
callback: Callable[[BleakClient], None] | None,
**kwargs: Any,
) -> None:
"""Set the disconnect callback."""
self.__disconnected_callback = callback
if self._backend:
self._backend.set_disconnected_callback(callback, **kwargs) # type: ignore[arg-type]
async def connect(self, **kwargs: Any) -> bool:
"""Connect to the specified GATT server."""
if (
not self._backend
or not self.__ble_device
or not self._async_get_backend_for_ble_device(self.__ble_device)
):
assert models.MANAGER is not None
wrapped_backend = (
self._async_get_backend() or self._async_get_fallback_backend()
)
self.__ble_device = (
await freshen_ble_device(wrapped_backend.device)
or wrapped_backend.device
)
self._backend = wrapped_backend.client(
self.__ble_device,
disconnected_callback=self.__disconnected_callback,
timeout=self.__timeout,
hass=models.MANAGER.hass,
)
return await super().connect(**kwargs)
@hass_callback
def _async_get_backend_for_ble_device(
self, ble_device: BLEDevice
) -> _HaWrappedBleakBackend | None:
"""Get the backend for a BLEDevice."""
details = ble_device.details
if not isinstance(details, dict) or "connector" not in details:
# If client is not defined in details
# its the client for this platform
cls = get_platform_client_backend_type()
return _HaWrappedBleakBackend(ble_device, cls)
connector: HaBluetoothConnector = details["connector"]
# Make sure the backend can connect to the device
# as some backends have connection limits
if not connector.can_connect():
return None
return _HaWrappedBleakBackend(ble_device, connector.client)
@hass_callback
def _async_get_backend(self) -> _HaWrappedBleakBackend | None:
"""Get the bleak backend for the given address."""
assert models.MANAGER is not None
address = self.__address
ble_device = models.MANAGER.async_ble_device_from_address(address, True)
if ble_device is None:
raise BleakError(f"No device found for address {address}")
if backend := self._async_get_backend_for_ble_device(ble_device):
return backend
return None
@hass_callback
def _async_get_fallback_backend(self) -> _HaWrappedBleakBackend:
"""Get a fallback backend for the given address."""
#
# The preferred backend cannot currently connect the device
# because it is likely out of connection slots.
#
# We need to try all backends to find one that can
# connect to the device.
#
assert models.MANAGER is not None
address = self.__address
device_advertisement_datas = models.MANAGER.async_get_discovered_devices_and_advertisement_data_by_address(
address, True
)
for device_advertisement_data in sorted(
device_advertisement_datas,
key=lambda device_advertisement_data: device_advertisement_data[1].rssi
or NO_RSSI_VALUE,
reverse=True,
):
if backend := self._async_get_backend_for_ble_device(
device_advertisement_data[0]
):
return backend
raise BleakError(
f"No backend with an available connection slot that can reach address {address} was found"
)
async def disconnect(self) -> bool:
"""Disconnect from the device."""
if self._backend is None:
return True
return await self._backend.disconnect()

View File

@ -5,12 +5,15 @@ import time
from typing import Any
from unittest.mock import patch
from bleak import BleakClient
from bleak.backends.scanner import AdvertisementData, BLEDevice
from bluetooth_adapters import DEFAULT_ADDRESS
from homeassistant.components.bluetooth import (
DOMAIN,
SOURCE_LOCAL,
BluetoothServiceInfo,
BluetoothServiceInfoBleak,
async_get_advertisement_callback,
models,
)
@ -29,6 +32,7 @@ __all__ = (
"patch_all_discovered_devices",
"patch_discovered_devices",
"generate_advertisement_data",
"MockBleakClient",
)
ADVERTISEMENT_DATA_DEFAULTS = {
@ -94,7 +98,7 @@ def inject_advertisement_with_time_and_source_connectable(
) -> None:
"""Inject an advertisement into the manager from a specific source at a time and connectable status."""
async_get_advertisement_callback(hass)(
models.BluetoothServiceInfoBleak(
BluetoothServiceInfoBleak(
name=adv.local_name or device.name or device.address,
address=device.address,
rssi=adv.rssi,
@ -111,7 +115,7 @@ def inject_advertisement_with_time_and_source_connectable(
def inject_bluetooth_service_info_bleak(
hass: HomeAssistant, info: models.BluetoothServiceInfoBleak
hass: HomeAssistant, info: BluetoothServiceInfoBleak
) -> None:
"""Inject an advertisement into the manager with connectable status."""
advertisement_data = generate_advertisement_data(
@ -137,7 +141,7 @@ def inject_bluetooth_service_info_bleak(
def inject_bluetooth_service_info(
hass: HomeAssistant, info: models.BluetoothServiceInfo
hass: HomeAssistant, info: BluetoothServiceInfo
) -> None:
"""Inject a BluetoothServiceInfo into the manager."""
advertisement_data = generate_advertisement_data( # type: ignore[no-untyped-call]
@ -190,3 +194,29 @@ async def _async_setup_with_adapter(
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
return entry
class MockBleakClient(BleakClient):
"""Mock bleak client."""
def __init__(self, *args, **kwargs):
"""Mock init."""
super().__init__(*args, **kwargs)
self._device_path = "/dev/test"
@property
def is_connected(self) -> bool:
"""Mock connected."""
return True
async def connect(self, *args, **kwargs):
"""Mock connect."""
return True
async def disconnect(self, *args, **kwargs):
"""Mock disconnect."""
pass
async def get_services(self, *args, **kwargs):
"""Mock get_services."""
return []

View File

@ -7,6 +7,7 @@ from unittest.mock import patch
from bleak.backends.scanner import AdvertisementData, BLEDevice
from homeassistant.components.bluetooth import (
BaseHaScanner,
async_register_scanner,
async_track_unavailable,
)
@ -17,7 +18,6 @@ from homeassistant.components.bluetooth.const import (
SOURCE_LOCAL,
UNAVAILABLE_TRACK_SECONDS,
)
from homeassistant.components.bluetooth.models import BaseHaScanner
from homeassistant.core import callback
from homeassistant.util import dt as dt_util

View File

@ -0,0 +1,267 @@
"""Tests for the Bluetooth base scanner models."""
from __future__ import annotations
from datetime import timedelta
import time
from unittest.mock import patch
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from homeassistant.components.bluetooth import BaseHaRemoteScanner, HaBluetoothConnector
from homeassistant.components.bluetooth.const import (
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
)
import homeassistant.util.dt as dt_util
from . import MockBleakClient, _get_manager, generate_advertisement_data
from tests.common import async_fire_time_changed
async def test_remote_scanner(hass):
"""Test the remote scanner base class merges advertisement_data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=["050a021a-0000-1000-8000-00805f9b34fb"],
service_data={"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff"},
manufacturer_data={1: b"\x01"},
rssi=-100,
)
switchbot_device_2 = BLEDevice(
"44:44:33:11:23:45",
"w",
{},
rssi=-100,
)
switchbot_device_adv_2 = generate_advertisement_data(
local_name="wohand",
service_uuids=["00000001-0000-1000-8000-00805f9b34fb"],
service_data={"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff"},
manufacturer_data={1: b"\x01", 2: b"\x02"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, True)
scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
data = scanner.discovered_devices_and_advertisement_data
discovered_device, discovered_adv_data = data[switchbot_device.address]
assert discovered_device.address == switchbot_device.address
assert discovered_device.name == switchbot_device.name
assert (
discovered_adv_data.manufacturer_data == switchbot_device_adv.manufacturer_data
)
assert discovered_adv_data.service_data == switchbot_device_adv.service_data
assert discovered_adv_data.service_uuids == switchbot_device_adv.service_uuids
scanner.inject_advertisement(switchbot_device_2, switchbot_device_adv_2)
data = scanner.discovered_devices_and_advertisement_data
discovered_device, discovered_adv_data = data[switchbot_device.address]
assert discovered_device.address == switchbot_device.address
assert discovered_device.name == switchbot_device.name
assert discovered_adv_data.manufacturer_data == {1: b"\x01", 2: b"\x02"}
assert discovered_adv_data.service_data == {
"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff",
"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff",
}
assert set(discovered_adv_data.service_uuids) == {
"050a021a-0000-1000-8000-00805f9b34fb",
"00000001-0000-1000-8000-00805f9b34fb",
}
cancel()
async def test_remote_scanner_expires_connectable(hass):
"""Test the remote scanner expires stale connectable data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, True)
scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
start_time_monotonic = time.monotonic()
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
expire_monotonic = (
start_time_monotonic
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
+ 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
cancel()
async def test_remote_scanner_expires_non_connectable(hass):
"""Test the remote scanner expires stale non connectable data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, False)
scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
start_time_monotonic = time.monotonic()
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
assert (
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
> CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
)
# The connectable timeout is not used for non connectable devices
expire_monotonic = (
start_time_monotonic
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
+ 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
# The non connectable timeout is used for non connectable devices
# which is always longer than the connectable timeout
expire_monotonic = (
start_time_monotonic + FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.base_scanner.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
cancel()

View File

@ -11,13 +11,13 @@ import pytest
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth import (
BaseHaScanner,
BluetoothChange,
BluetoothScanningMode,
BluetoothServiceInfo,
async_process_advertisements,
async_rediscover_address,
async_track_unavailable,
models,
scanner,
)
from homeassistant.components.bluetooth.const import (
@ -36,6 +36,7 @@ from homeassistant.components.bluetooth.match import (
SERVICE_DATA_UUID,
SERVICE_UUID,
)
from homeassistant.components.bluetooth.wrappers import HaBleakScannerWrapper
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant, callback
@ -2210,7 +2211,7 @@ async def test_wrapped_instance_with_filter(
empty_adv = generate_advertisement_data(local_name="empty")
assert _get_manager() is not None
scanner = models.HaBleakScannerWrapper(
scanner = HaBleakScannerWrapper(
filters={"UUIDs": ["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]}
)
scanner.register_detection_callback(_device_detected)
@ -2282,7 +2283,7 @@ async def test_wrapped_instance_with_service_uuids(
empty_adv = generate_advertisement_data(local_name="empty")
assert _get_manager() is not None
scanner = models.HaBleakScannerWrapper(
scanner = HaBleakScannerWrapper(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
scanner.register_detection_callback(_device_detected)
@ -2332,7 +2333,7 @@ async def test_wrapped_instance_with_broken_callbacks(
)
assert _get_manager() is not None
scanner = models.HaBleakScannerWrapper(
scanner = HaBleakScannerWrapper(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
scanner.register_detection_callback(_device_detected)
@ -2381,7 +2382,7 @@ async def test_wrapped_instance_changes_uuids(
empty_adv = generate_advertisement_data(local_name="empty")
assert _get_manager() is not None
scanner = models.HaBleakScannerWrapper()
scanner = HaBleakScannerWrapper()
scanner.set_scanning_filter(
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
)
@ -2436,7 +2437,7 @@ async def test_wrapped_instance_changes_filters(
empty_adv = generate_advertisement_data(local_name="empty")
assert _get_manager() is not None
scanner = models.HaBleakScannerWrapper()
scanner = HaBleakScannerWrapper()
scanner.set_scanning_filter(
filters={"UUIDs": ["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]}
)
@ -2468,7 +2469,7 @@ async def test_wrapped_instance_unsupported_filter(
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
assert _get_manager() is not None
scanner = models.HaBleakScannerWrapper()
scanner = HaBleakScannerWrapper()
scanner.set_scanning_filter(
filters={
"unsupported": ["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
@ -2643,12 +2644,12 @@ async def test_no_auto_detect_bluetooth_adapters_windows(hass):
async def test_getting_the_scanner_returns_the_wrapped_instance(hass, enable_bluetooth):
"""Test getting the scanner returns the wrapped instance."""
scanner = bluetooth.async_get_scanner(hass)
assert isinstance(scanner, models.HaBleakScannerWrapper)
assert isinstance(scanner, HaBleakScannerWrapper)
async def test_scanner_count_connectable(hass, enable_bluetooth):
"""Test getting the connectable scanner count."""
scanner = models.BaseHaScanner(hass, "any")
scanner = BaseHaScanner(hass, "any")
cancel = bluetooth.async_register_scanner(hass, scanner, False)
assert bluetooth.async_scanner_count(hass, connectable=True) == 1
cancel()
@ -2656,7 +2657,7 @@ async def test_scanner_count_connectable(hass, enable_bluetooth):
async def test_scanner_count(hass, enable_bluetooth):
"""Test getting the connectable and non-connectable scanner count."""
scanner = models.BaseHaScanner(hass, "any")
scanner = BaseHaScanner(hass, "any")
cancel = bluetooth.async_register_scanner(hass, scanner, False)
assert bluetooth.async_scanner_count(hass, connectable=False) == 2
cancel()

View File

@ -8,7 +8,7 @@ from bluetooth_adapters import AdvertisementHistory
import pytest
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth import models
from homeassistant.components.bluetooth import BaseHaScanner
from homeassistant.components.bluetooth.manager import (
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
)
@ -26,9 +26,7 @@ from . import (
@pytest.fixture
def register_hci0_scanner(hass: HomeAssistant) -> None:
"""Register an hci0 scanner."""
cancel = bluetooth.async_register_scanner(
hass, models.BaseHaScanner(hass, "hci0"), True
)
cancel = bluetooth.async_register_scanner(hass, BaseHaScanner(hass, "hci0"), True)
yield
cancel()
@ -36,9 +34,7 @@ def register_hci0_scanner(hass: HomeAssistant) -> None:
@pytest.fixture
def register_hci1_scanner(hass: HomeAssistant) -> None:
"""Register an hci1 scanner."""
cancel = bluetooth.async_register_scanner(
hass, models.BaseHaScanner(hass, "hci1"), True
)
cancel = bluetooth.async_register_scanner(hass, BaseHaScanner(hass, "hci1"), True)
yield
cancel()
@ -420,7 +416,7 @@ async def test_switching_adapters_when_one_goes_away(
):
"""Test switching adapters when one goes away."""
cancel_hci2 = bluetooth.async_register_scanner(
hass, models.BaseHaScanner(hass, "hci2"), True
hass, BaseHaScanner(hass, "hci2"), True
)
address = "44:44:33:11:23:45"

View File

@ -1,62 +1,29 @@
"""Tests for the Bluetooth integration models."""
from __future__ import annotations
from datetime import timedelta
import time
from unittest.mock import patch
import bleak
from bleak import BleakClient, BleakError
from bleak import BleakError
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
import pytest
from homeassistant.components.bluetooth.models import (
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
BaseHaRemoteScanner,
from homeassistant.components.bluetooth import (
BaseHaScanner,
HaBleakClientWrapper,
HaBleakScannerWrapper,
HaBluetoothConnector,
)
import homeassistant.util.dt as dt_util
from homeassistant.components.bluetooth.wrappers import HaBleakClientWrapper
from . import (
MockBleakClient,
_get_manager,
generate_advertisement_data,
inject_advertisement,
inject_advertisement_with_source,
)
from tests.common import async_fire_time_changed
class MockBleakClient(BleakClient):
"""Mock bleak client."""
def __init__(self, *args, **kwargs):
"""Mock init."""
super().__init__(*args, **kwargs)
self._device_path = "/dev/test"
@property
def is_connected(self) -> bool:
"""Mock connected."""
return True
async def connect(self, *args, **kwargs):
"""Mock connect."""
return True
async def disconnect(self, *args, **kwargs):
"""Mock disconnect."""
pass
async def get_services(self, *args, **kwargs):
"""Mock get_services."""
return []
async def test_wrapped_bleak_scanner(hass, enable_bluetooth):
"""Test wrapped bleak scanner dispatches calls as expected."""
@ -353,250 +320,3 @@ async def test_ble_device_with_proxy_client_out_of_connections_uses_best_availab
client.set_disconnected_callback(lambda client: None)
await client.disconnect()
cancel()
async def test_remote_scanner(hass):
"""Test the remote scanner base class merges advertisement_data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=["050a021a-0000-1000-8000-00805f9b34fb"],
service_data={"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff"},
manufacturer_data={1: b"\x01"},
rssi=-100,
)
switchbot_device_2 = BLEDevice(
"44:44:33:11:23:45",
"w",
{},
rssi=-100,
)
switchbot_device_adv_2 = generate_advertisement_data(
local_name="wohand",
service_uuids=["00000001-0000-1000-8000-00805f9b34fb"],
service_data={"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff"},
manufacturer_data={1: b"\x01", 2: b"\x02"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, True)
scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
data = scanner.discovered_devices_and_advertisement_data
discovered_device, discovered_adv_data = data[switchbot_device.address]
assert discovered_device.address == switchbot_device.address
assert discovered_device.name == switchbot_device.name
assert (
discovered_adv_data.manufacturer_data == switchbot_device_adv.manufacturer_data
)
assert discovered_adv_data.service_data == switchbot_device_adv.service_data
assert discovered_adv_data.service_uuids == switchbot_device_adv.service_uuids
scanner.inject_advertisement(switchbot_device_2, switchbot_device_adv_2)
data = scanner.discovered_devices_and_advertisement_data
discovered_device, discovered_adv_data = data[switchbot_device.address]
assert discovered_device.address == switchbot_device.address
assert discovered_device.name == switchbot_device.name
assert discovered_adv_data.manufacturer_data == {1: b"\x01", 2: b"\x02"}
assert discovered_adv_data.service_data == {
"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff",
"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff",
}
assert set(discovered_adv_data.service_uuids) == {
"050a021a-0000-1000-8000-00805f9b34fb",
"00000001-0000-1000-8000-00805f9b34fb",
}
cancel()
async def test_remote_scanner_expires_connectable(hass):
"""Test the remote scanner expires stale connectable data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, True)
scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
start_time_monotonic = time.monotonic()
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
expire_monotonic = (
start_time_monotonic
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
+ 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.models.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
cancel()
async def test_remote_scanner_expires_non_connectable(hass):
"""Test the remote scanner expires stale non connectable data."""
manager = _get_manager()
switchbot_device = BLEDevice(
"44:44:33:11:23:45",
"wohand",
{},
rssi=-100,
)
switchbot_device_adv = generate_advertisement_data(
local_name="wohand",
service_uuids=[],
manufacturer_data={1: b"\x01"},
rssi=-100,
)
class FakeScanner(BaseHaRemoteScanner):
def inject_advertisement(
self, device: BLEDevice, advertisement_data: AdvertisementData
) -> None:
"""Inject an advertisement."""
self._async_on_advertisement(
device.address,
advertisement_data.rssi,
device.name,
advertisement_data.service_uuids,
advertisement_data.service_data,
advertisement_data.manufacturer_data,
advertisement_data.tx_power,
)
new_info_callback = manager.scanner_adv_received
connector = (
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
)
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, False)
scanner.async_setup()
cancel = manager.async_register_scanner(scanner, True)
start_time_monotonic = time.monotonic()
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
devices = scanner.discovered_devices
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
assert devices[0].name == "wohand"
assert (
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
> CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
)
# The connectable timeout is not used for non connectable devices
expire_monotonic = (
start_time_monotonic
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
+ 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.models.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
assert len(scanner.discovered_devices) == 1
assert len(scanner.discovered_devices_and_advertisement_data) == 1
# The non connectable timeout is used for non connectable devices
# which is always longer than the connectable timeout
expire_monotonic = (
start_time_monotonic + FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
expire_utc = dt_util.utcnow() + timedelta(
seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
)
with patch(
"homeassistant.components.bluetooth.models.MONOTONIC_TIME",
return_value=expire_monotonic,
):
async_fire_time_changed(hass, expire_utc)
await hass.async_block_till_done()
assert len(scanner.discovered_devices) == 0
assert len(scanner.discovered_devices_and_advertisement_data) == 0
cancel()

View File

@ -7,14 +7,14 @@ import bleak
from bleak.backends.device import BLEDevice
import bleak_retry_connector
from homeassistant.components.bluetooth.models import (
HaBleakClientWrapper,
HaBleakScannerWrapper,
)
from homeassistant.components.bluetooth.usage import (
install_multiple_bleak_catcher,
uninstall_multiple_bleak_catcher,
)
from homeassistant.components.bluetooth.wrappers import (
HaBleakClientWrapper,
HaBleakScannerWrapper,
)
from . import _get_manager