Migrate ESPHome bleak implementation to bleak-esphome library (#105611)

This commit is contained in:
J. Nick Koston 2023-12-14 07:21:31 -10:00 committed by GitHub
parent 0d9a583f4d
commit 8d1a69ae84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 20 additions and 1063 deletions

View File

@ -8,6 +8,10 @@ import logging
from typing import Any
from aioesphomeapi import APIClient, BluetoothProxyFeature
from bleak_esphome.backend.cache import ESPHomeBluetoothCache
from bleak_esphome.backend.client import ESPHomeClient, ESPHomeClientData
from bleak_esphome.backend.device import ESPHomeBluetoothDevice
from bleak_esphome.backend.scanner import ESPHomeScanner
from homeassistant.components.bluetooth import (
HaBluetoothConnector,
@ -17,10 +21,6 @@ from homeassistant.config_entries import ConfigEntry
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback
from ..entry_data import RuntimeEntryData
from .cache import ESPHomeBluetoothCache
from .client import ESPHomeClient, ESPHomeClientData
from .device import ESPHomeBluetoothDevice
from .scanner import ESPHomeScanner
_LOGGER = logging.getLogger(__name__)

View File

@ -1,50 +0,0 @@
"""Bluetooth cache for esphome."""
from __future__ import annotations
from collections.abc import MutableMapping
from dataclasses import dataclass, field
from bleak.backends.service import BleakGATTServiceCollection
from lru import LRU # pylint: disable=no-name-in-module
MAX_CACHED_SERVICES = 128
@dataclass(slots=True)
class ESPHomeBluetoothCache:
"""Shared cache between all ESPHome bluetooth devices."""
_gatt_services_cache: MutableMapping[int, BleakGATTServiceCollection] = field(
default_factory=lambda: LRU(MAX_CACHED_SERVICES)
)
_gatt_mtu_cache: MutableMapping[int, int] = field(
default_factory=lambda: LRU(MAX_CACHED_SERVICES)
)
def get_gatt_services_cache(
self, address: int
) -> BleakGATTServiceCollection | None:
"""Get the BleakGATTServiceCollection for the given address."""
return self._gatt_services_cache.get(address)
def set_gatt_services_cache(
self, address: int, services: BleakGATTServiceCollection
) -> None:
"""Set the BleakGATTServiceCollection for the given address."""
self._gatt_services_cache[address] = services
def clear_gatt_services_cache(self, address: int) -> None:
"""Clear the BleakGATTServiceCollection for the given address."""
self._gatt_services_cache.pop(address, None)
def get_gatt_mtu_cache(self, address: int) -> int | None:
"""Get the mtu cache for the given address."""
return self._gatt_mtu_cache.get(address)
def set_gatt_mtu_cache(self, address: int, mtu: int) -> None:
"""Set the mtu cache for the given address."""
self._gatt_mtu_cache[address] = mtu
def clear_gatt_mtu_cache(self, address: int) -> None:
"""Clear the mtu cache for the given address."""
self._gatt_mtu_cache.pop(address, None)

View File

@ -1,95 +0,0 @@
"""BleakGATTCharacteristicESPHome."""
from __future__ import annotations
import contextlib
from uuid import UUID
from aioesphomeapi.model import BluetoothGATTCharacteristic
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.descriptor import BleakGATTDescriptor
PROPERTY_MASKS = {
2**n: prop
for n, prop in enumerate(
(
"broadcast",
"read",
"write-without-response",
"write",
"notify",
"indicate",
"authenticated-signed-writes",
"extended-properties",
"reliable-writes",
"writable-auxiliaries",
)
)
}
class BleakGATTCharacteristicESPHome(BleakGATTCharacteristic):
"""GATT Characteristic implementation for the ESPHome backend."""
obj: BluetoothGATTCharacteristic
def __init__(
self,
obj: BluetoothGATTCharacteristic,
max_write_without_response_size: int,
service_uuid: str,
service_handle: int,
) -> None:
"""Init a BleakGATTCharacteristicESPHome."""
super().__init__(obj, max_write_without_response_size)
self.__descriptors: list[BleakGATTDescriptor] = []
self.__service_uuid: str = service_uuid
self.__service_handle: int = service_handle
char_props = self.obj.properties
self.__props: list[str] = [
prop for mask, prop in PROPERTY_MASKS.items() if char_props & mask
]
@property
def service_uuid(self) -> str:
"""Uuid of the Service containing this characteristic."""
return self.__service_uuid
@property
def service_handle(self) -> int:
"""Integer handle of the Service containing this characteristic."""
return self.__service_handle
@property
def handle(self) -> int:
"""Integer handle for this characteristic."""
return self.obj.handle
@property
def uuid(self) -> str:
"""Uuid of this characteristic."""
return self.obj.uuid
@property
def properties(self) -> list[str]:
"""Properties of this characteristic."""
return self.__props
@property
def descriptors(self) -> list[BleakGATTDescriptor]:
"""List of descriptors for this service."""
return self.__descriptors
def get_descriptor(self, specifier: int | str | UUID) -> BleakGATTDescriptor | None:
"""Get a descriptor by handle (int) or UUID (str or uuid.UUID)."""
with contextlib.suppress(StopIteration):
if isinstance(specifier, int):
return next(filter(lambda x: x.handle == specifier, self.descriptors))
return next(filter(lambda x: x.uuid == str(specifier), self.descriptors))
return None
def add_descriptor(self, descriptor: BleakGATTDescriptor) -> None:
"""Add a :py:class:`~BleakGATTDescriptor` to the characteristic.
Should not be used by end user, but rather by `bleak` itself.
"""
self.__descriptors.append(descriptor)

View File

@ -1,718 +0,0 @@
"""Bluetooth client for esphome."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
import contextlib
from dataclasses import dataclass, field
from functools import partial
import logging
import sys
from typing import Any, Concatenate, ParamSpec, TypeVar
import uuid
if sys.version_info < (3, 12):
from typing_extensions import Buffer
else:
from collections.abc import Buffer
from aioesphomeapi import (
ESP_CONNECTION_ERROR_DESCRIPTION,
ESPHOME_GATT_ERRORS,
APIClient,
APIVersion,
BLEConnectionError,
BluetoothConnectionDroppedError,
BluetoothProxyFeature,
DeviceInfo,
)
from aioesphomeapi.core import (
APIConnectionError,
BluetoothGATTAPIError,
TimeoutAPIError,
)
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.client import BaseBleakClient, NotifyCallback
from bleak.backends.device import BLEDevice
from bleak.backends.service import BleakGATTServiceCollection
from bleak.exc import BleakError
from homeassistant.core import CALLBACK_TYPE
from .cache import ESPHomeBluetoothCache
from .characteristic import BleakGATTCharacteristicESPHome
from .descriptor import BleakGATTDescriptorESPHome
from .device import ESPHomeBluetoothDevice
from .scanner import ESPHomeScanner
from .service import BleakGATTServiceESPHome
DEFAULT_MTU = 23
GATT_HEADER_SIZE = 3
DISCONNECT_TIMEOUT = 5.0
CONNECT_FREE_SLOT_TIMEOUT = 2.0
GATT_READ_TIMEOUT = 30.0
# CCCD (Characteristic Client Config Descriptor)
CCCD_UUID = "00002902-0000-1000-8000-00805f9b34fb"
CCCD_NOTIFY_BYTES = b"\x01\x00"
CCCD_INDICATE_BYTES = b"\x02\x00"
DEFAULT_MAX_WRITE_WITHOUT_RESPONSE = DEFAULT_MTU - GATT_HEADER_SIZE
_LOGGER = logging.getLogger(__name__)
_ESPHomeClient = TypeVar("_ESPHomeClient", bound="ESPHomeClient")
_R = TypeVar("_R")
_P = ParamSpec("_P")
def mac_to_int(address: str) -> int:
"""Convert a mac address to an integer."""
return int(address.replace(":", ""), 16)
def api_error_as_bleak_error(
func: Callable[Concatenate[_ESPHomeClient, _P], Coroutine[Any, Any, _R]]
) -> Callable[Concatenate[_ESPHomeClient, _P], Coroutine[Any, Any, _R]]:
"""Define a wrapper throw esphome api errors as BleakErrors."""
async def _async_wrap_bluetooth_operation(
self: _ESPHomeClient, *args: _P.args, **kwargs: _P.kwargs
) -> _R:
# pylint: disable=protected-access
try:
return await func(self, *args, **kwargs)
except TimeoutAPIError as err:
raise asyncio.TimeoutError(str(err)) from err
except BluetoothConnectionDroppedError as ex:
_LOGGER.debug(
"%s: BLE device disconnected during %s operation",
self._description,
func.__name__,
)
self._async_ble_device_disconnected()
raise BleakError(str(ex)) from ex
except BluetoothGATTAPIError as ex:
# If the device disconnects in the middle of an operation
# be sure to mark it as disconnected so any library using
# the proxy knows to reconnect.
#
# Because callbacks are delivered asynchronously it's possible
# that we find out about the disconnection during the operation
# before the callback is delivered.
if ex.error.error == -1:
_LOGGER.debug(
"%s: BLE device disconnected during %s operation",
self._description,
func.__name__,
)
self._async_ble_device_disconnected()
raise BleakError(str(ex)) from ex
except APIConnectionError as err:
raise BleakError(str(err)) from err
return _async_wrap_bluetooth_operation
@dataclass(slots=True)
class ESPHomeClientData:
"""Define a class that stores client data for an esphome client."""
bluetooth_device: ESPHomeBluetoothDevice
cache: ESPHomeBluetoothCache
client: APIClient
device_info: DeviceInfo
api_version: APIVersion
title: str
scanner: ESPHomeScanner | None
disconnect_callbacks: set[Callable[[], None]] = field(default_factory=set)
class ESPHomeClient(BaseBleakClient):
"""ESPHome Bleak Client."""
def __init__(
self,
address_or_ble_device: BLEDevice | str,
*args: Any,
client_data: ESPHomeClientData,
**kwargs: Any,
) -> None:
"""Initialize the ESPHomeClient."""
device_info = client_data.device_info
self._disconnect_callbacks = client_data.disconnect_callbacks
assert isinstance(address_or_ble_device, BLEDevice)
super().__init__(address_or_ble_device, *args, **kwargs)
self._loop = asyncio.get_running_loop()
ble_device = address_or_ble_device
self._ble_device = ble_device
self._address_as_int = mac_to_int(ble_device.address)
assert ble_device.details is not None
self._source = ble_device.details["source"]
self._cache = client_data.cache
self._bluetooth_device = client_data.bluetooth_device
self._client = client_data.client
self._is_connected = False
self._mtu: int | None = None
self._cancel_connection_state: CALLBACK_TYPE | None = None
self._notify_cancels: dict[
int, tuple[Callable[[], Coroutine[Any, Any, None]], Callable[[], None]]
] = {}
self._device_info = client_data.device_info
self._feature_flags = device_info.bluetooth_proxy_feature_flags_compat(
client_data.api_version
)
self._address_type = ble_device.details["address_type"]
self._source_name = f"{client_data.title} [{self._source}]"
self._description = (
f"{self._source_name}: {ble_device.name} - {ble_device.address}"
)
scanner = client_data.scanner
assert scanner is not None
self._scanner = scanner
def __str__(self) -> str:
"""Return the string representation of the client."""
return f"ESPHomeClient ({self._description})"
def _async_disconnected_cleanup(self) -> None:
"""Clean up on disconnect."""
self.services = BleakGATTServiceCollection() # type: ignore[no-untyped-call]
self._is_connected = False
for _, notify_abort in self._notify_cancels.values():
notify_abort()
self._notify_cancels.clear()
self._disconnect_callbacks.discard(self._async_esp_disconnected)
if self._cancel_connection_state:
self._cancel_connection_state()
self._cancel_connection_state = None
def _async_ble_device_disconnected(self) -> None:
"""Handle the BLE device disconnecting from the ESP."""
was_connected = self._is_connected
self._async_disconnected_cleanup()
if was_connected:
_LOGGER.debug("%s: BLE device disconnected", self._description)
self._async_call_bleak_disconnected_callback()
def _async_esp_disconnected(self) -> None:
"""Handle the esp32 client disconnecting from us."""
_LOGGER.debug("%s: ESP device disconnected", self._description)
# Calling _async_ble_device_disconnected calls
# _async_disconnected_cleanup which will also remove
# the disconnect callbacks
self._async_ble_device_disconnected()
def _async_call_bleak_disconnected_callback(self) -> None:
"""Call the disconnected callback to inform the bleak consumer."""
if self._disconnected_callback:
self._disconnected_callback()
self._disconnected_callback = None
def _on_bluetooth_connection_state(
self,
connected_future: asyncio.Future[bool],
connected: bool,
mtu: int,
error: int,
) -> None:
"""Handle a connect or disconnect."""
_LOGGER.debug(
"%s: Connection state changed to connected=%s mtu=%s error=%s",
self._description,
connected,
mtu,
error,
)
if connected:
self._is_connected = True
if not self._mtu:
self._mtu = mtu
self._cache.set_gatt_mtu_cache(self._address_as_int, mtu)
else:
self._async_ble_device_disconnected()
if connected_future.done():
return
if error:
try:
ble_connection_error = BLEConnectionError(error)
ble_connection_error_name = ble_connection_error.name
human_error = ESP_CONNECTION_ERROR_DESCRIPTION[ble_connection_error]
except (KeyError, ValueError):
ble_connection_error_name = str(error)
human_error = ESPHOME_GATT_ERRORS.get(
error, f"Unknown error code {error}"
)
connected_future.set_exception(
BleakError(
f"Error {ble_connection_error_name} while connecting:"
f" {human_error}"
)
)
return
if not connected:
connected_future.set_exception(BleakError("Disconnected"))
return
_LOGGER.debug(
"%s: connected, registering for disconnected callbacks",
self._description,
)
self._disconnect_callbacks.add(self._async_esp_disconnected)
connected_future.set_result(connected)
@api_error_as_bleak_error
async def connect(
self, dangerous_use_bleak_cache: bool = False, **kwargs: Any
) -> bool:
"""Connect to a specified Peripheral.
**kwargs:
timeout (float): Timeout for required
``BleakScanner.find_device_by_address`` call. Defaults to 10.0.
Returns:
Boolean representing connection status.
"""
await self._wait_for_free_connection_slot(CONNECT_FREE_SLOT_TIMEOUT)
cache = self._cache
self._mtu = cache.get_gatt_mtu_cache(self._address_as_int)
has_cache = bool(
dangerous_use_bleak_cache
and self._feature_flags & BluetoothProxyFeature.REMOTE_CACHING
and cache.get_gatt_services_cache(self._address_as_int)
and self._mtu
)
connected_future: asyncio.Future[bool] = self._loop.create_future()
timeout = kwargs.get("timeout", self._timeout)
with self._scanner.connecting():
try:
self._cancel_connection_state = (
await self._client.bluetooth_device_connect(
self._address_as_int,
partial(self._on_bluetooth_connection_state, connected_future),
timeout=timeout,
has_cache=has_cache,
feature_flags=self._feature_flags,
address_type=self._address_type,
)
)
except asyncio.CancelledError:
if connected_future.done():
with contextlib.suppress(BleakError):
# If we are cancelled while connecting,
# we need to make sure we await the future
# to avoid a warning about an un-retrieved
# exception.
await connected_future
raise
except Exception as ex:
if connected_future.done():
with contextlib.suppress(BleakError):
# If the connect call throws an exception,
# we need to make sure we await the future
# to avoid a warning about an un-retrieved
# exception since we prefer to raise the
# exception from the connect call as it
# will be more descriptive.
await connected_future
connected_future.cancel(f"Unhandled exception in connect call: {ex}")
raise
await connected_future
try:
await self._get_services(
dangerous_use_bleak_cache=dangerous_use_bleak_cache
)
except asyncio.CancelledError:
# On cancel we must still raise cancelled error
# to avoid blocking the cancellation even if the
# disconnect call fails.
with contextlib.suppress(Exception):
await self._disconnect()
raise
except Exception:
await self._disconnect()
raise
return True
@api_error_as_bleak_error
async def disconnect(self) -> bool:
"""Disconnect from the peripheral device."""
return await self._disconnect()
async def _disconnect(self) -> bool:
await self._client.bluetooth_device_disconnect(self._address_as_int)
self._async_ble_device_disconnected()
await self._wait_for_free_connection_slot(DISCONNECT_TIMEOUT)
return True
async def _wait_for_free_connection_slot(self, timeout: float) -> None:
"""Wait for a free connection slot."""
bluetooth_device = self._bluetooth_device
if bluetooth_device.ble_connections_free:
return
_LOGGER.debug(
"%s: Out of connection slots, waiting for a free one",
self._description,
)
async with asyncio.timeout(timeout):
await bluetooth_device.wait_for_ble_connections_free()
@property
def is_connected(self) -> bool:
"""Is Connected."""
return self._is_connected
@property
def mtu_size(self) -> int:
"""Get ATT MTU size for active connection."""
return self._mtu or DEFAULT_MTU
@api_error_as_bleak_error
async def pair(self, *args: Any, **kwargs: Any) -> bool:
"""Attempt to pair."""
if not self._feature_flags & BluetoothProxyFeature.PAIRING:
raise NotImplementedError(
"Pairing is not available in this version ESPHome; "
f"Upgrade the ESPHome version on the {self._device_info.name} device."
)
self._raise_if_not_connected()
response = await self._client.bluetooth_device_pair(self._address_as_int)
if response.paired:
return True
_LOGGER.error(
"%s: Pairing failed due to error: %s", self._description, response.error
)
return False
@api_error_as_bleak_error
async def unpair(self) -> bool:
"""Attempt to unpair."""
if not self._feature_flags & BluetoothProxyFeature.PAIRING:
raise NotImplementedError(
"Unpairing is not available in this version ESPHome; "
f"Upgrade the ESPHome version on the {self._device_info.name} device."
)
self._raise_if_not_connected()
response = await self._client.bluetooth_device_unpair(self._address_as_int)
if response.success:
return True
_LOGGER.error(
"%s: Unpairing failed due to error: %s", self._description, response.error
)
return False
@api_error_as_bleak_error
async def get_services(
self, dangerous_use_bleak_cache: bool = False, **kwargs: Any
) -> BleakGATTServiceCollection:
"""Get all services registered for this GATT server.
Returns:
A :py:class:`bleak.backends.service.BleakGATTServiceCollection`
with this device's services tree.
"""
return await self._get_services(
dangerous_use_bleak_cache=dangerous_use_bleak_cache, **kwargs
)
async def _get_services(
self, dangerous_use_bleak_cache: bool = False, **kwargs: Any
) -> BleakGATTServiceCollection:
"""Get all services registered for this GATT server.
Must only be called from get_services or connected
"""
self._raise_if_not_connected()
address_as_int = self._address_as_int
cache = self._cache
# If the connection version >= 3, we must use the cache
# because the esp has already wiped the services list to
# save memory.
if (
self._feature_flags & BluetoothProxyFeature.REMOTE_CACHING
or dangerous_use_bleak_cache
) and (cached_services := cache.get_gatt_services_cache(address_as_int)):
_LOGGER.debug("%s: Cached services hit", self._description)
self.services = cached_services
return self.services
_LOGGER.debug("%s: Cached services miss", self._description)
esphome_services = await self._client.bluetooth_gatt_get_services(
address_as_int
)
_LOGGER.debug("%s: Got services: %s", self._description, esphome_services)
max_write_without_response = self.mtu_size - GATT_HEADER_SIZE
services = BleakGATTServiceCollection() # type: ignore[no-untyped-call]
for service in esphome_services.services:
services.add_service(BleakGATTServiceESPHome(service))
for characteristic in service.characteristics:
services.add_characteristic(
BleakGATTCharacteristicESPHome(
characteristic,
max_write_without_response,
service.uuid,
service.handle,
)
)
for descriptor in characteristic.descriptors:
services.add_descriptor(
BleakGATTDescriptorESPHome(
descriptor,
characteristic.uuid,
characteristic.handle,
)
)
if not esphome_services.services:
# If we got no services, we must have disconnected
# or something went wrong on the ESP32's BLE stack.
raise BleakError("Failed to get services from remote esp")
self.services = services
_LOGGER.debug("%s: Cached services saved", self._description)
cache.set_gatt_services_cache(address_as_int, services)
return services
def _resolve_characteristic(
self, char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID
) -> BleakGATTCharacteristic:
"""Resolve a characteristic specifier to a BleakGATTCharacteristic object."""
if (services := self.services) is None:
raise BleakError(f"{self._description}: Services have not been resolved")
if not isinstance(char_specifier, BleakGATTCharacteristic):
characteristic = services.get_characteristic(char_specifier)
else:
characteristic = char_specifier
if not characteristic:
raise BleakError(
f"{self._description}: Characteristic {char_specifier} was not found!"
)
return characteristic
@api_error_as_bleak_error
async def clear_cache(self) -> bool:
"""Clear the GATT cache."""
cache = self._cache
cache.clear_gatt_services_cache(self._address_as_int)
cache.clear_gatt_mtu_cache(self._address_as_int)
if not self._feature_flags & BluetoothProxyFeature.CACHE_CLEARING:
_LOGGER.warning(
"On device cache clear is not available with this ESPHome version; "
"Upgrade the ESPHome version on the device %s; Only memory cache will be cleared",
self._device_info.name,
)
return True
self._raise_if_not_connected()
response = await self._client.bluetooth_device_clear_cache(self._address_as_int)
if response.success:
return True
_LOGGER.error(
"%s: Clear cache failed due to error: %s",
self._description,
response.error,
)
return False
@api_error_as_bleak_error
async def read_gatt_char(
self,
char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID,
**kwargs: Any,
) -> bytearray:
"""Perform read operation on the specified GATT characteristic.
Args:
char_specifier (BleakGATTCharacteristic, int, str or UUID):
The characteristic to read from, specified by either integer
handle, UUID or directly by the BleakGATTCharacteristic
object representing it.
**kwargs: Unused
Returns:
(bytearray) The read data.
"""
self._raise_if_not_connected()
characteristic = self._resolve_characteristic(char_specifier)
return await self._client.bluetooth_gatt_read(
self._address_as_int, characteristic.handle, GATT_READ_TIMEOUT
)
@api_error_as_bleak_error
async def read_gatt_descriptor(self, handle: int, **kwargs: Any) -> bytearray:
"""Perform read operation on the specified GATT descriptor.
Args:
handle (int): The handle of the descriptor to read from.
**kwargs: Unused
Returns:
(bytearray) The read data.
"""
self._raise_if_not_connected()
return await self._client.bluetooth_gatt_read_descriptor(
self._address_as_int, handle, GATT_READ_TIMEOUT
)
@api_error_as_bleak_error
async def write_gatt_char(
self,
characteristic: BleakGATTCharacteristic | int | str | uuid.UUID,
data: Buffer,
response: bool = False,
) -> None:
"""Perform a write operation of the specified GATT characteristic.
Args:
characteristic (BleakGATTCharacteristic, int, str or UUID):
The characteristic to write to, specified by either integer
handle, UUID or directly by the BleakGATTCharacteristic object
representing it.
data (bytes or bytearray): The data to send.
response (bool): If write-with-response operation should be done.
Defaults to `False`.
"""
self._raise_if_not_connected()
characteristic = self._resolve_characteristic(characteristic)
await self._client.bluetooth_gatt_write(
self._address_as_int, characteristic.handle, bytes(data), response
)
@api_error_as_bleak_error
async def write_gatt_descriptor(self, handle: int, data: Buffer) -> None:
"""Perform a write operation on the specified GATT descriptor.
Args:
handle (int): The handle of the descriptor to read from.
data (bytes or bytearray): The data to send.
"""
self._raise_if_not_connected()
await self._client.bluetooth_gatt_write_descriptor(
self._address_as_int, handle, bytes(data)
)
@api_error_as_bleak_error
async def start_notify(
self,
characteristic: BleakGATTCharacteristic,
callback: NotifyCallback,
**kwargs: Any,
) -> None:
"""Activate notifications/indications on a characteristic.
Callbacks must accept two inputs. The first will be a integer handle of the
characteristic generating the data and the second will be a ``bytearray``
containing the data sent from the connected server.
.. code-block:: python
def callback(sender: int, data: bytearray):
print(f"{sender}: {data}")
client.start_notify(char_uuid, callback)
Args:
characteristic (BleakGATTCharacteristic):
The characteristic to activate notifications/indications on a
characteristic, specified by either integer handle, UUID or
directly by the BleakGATTCharacteristic object representing it.
callback (function): The function to be called on notification.
kwargs: Unused.
"""
self._raise_if_not_connected()
ble_handle = characteristic.handle
if ble_handle in self._notify_cancels:
raise BleakError(
f"{self._description}: Notifications are already enabled on "
f"service:{characteristic.service_uuid} "
f"characteristic:{characteristic.uuid} "
f"handle:{ble_handle}"
)
if (
"notify" not in characteristic.properties
and "indicate" not in characteristic.properties
):
raise BleakError(
f"{self._description}: Characteristic {characteristic.uuid} "
"does not have notify or indicate property set."
)
self._notify_cancels[
ble_handle
] = await self._client.bluetooth_gatt_start_notify(
self._address_as_int,
ble_handle,
lambda handle, data: callback(data),
)
if not self._feature_flags & BluetoothProxyFeature.REMOTE_CACHING:
return
# For connection v3 we are responsible for enabling notifications
# on the cccd (characteristic client config descriptor) handle since
# the esp32 will not have resolved the characteristic descriptors to
# save memory since doing so can exhaust the memory and cause a soft
# reset
cccd_descriptor = characteristic.get_descriptor(CCCD_UUID)
if not cccd_descriptor:
raise BleakError(
f"{self._description}: Characteristic {characteristic.uuid} "
"does not have a characteristic client config descriptor."
)
_LOGGER.debug(
"%s: Writing to CCD descriptor %s for notifications with properties=%s",
self._description,
cccd_descriptor.handle,
characteristic.properties,
)
supports_notify = "notify" in characteristic.properties
await self._client.bluetooth_gatt_write_descriptor(
self._address_as_int,
cccd_descriptor.handle,
CCCD_NOTIFY_BYTES if supports_notify else CCCD_INDICATE_BYTES,
wait_for_response=False,
)
@api_error_as_bleak_error
async def stop_notify(
self,
char_specifier: BleakGATTCharacteristic | int | str | uuid.UUID,
) -> None:
"""Deactivate notification/indication on a specified characteristic.
Args:
char_specifier (BleakGATTCharacteristic, int, str or UUID):
The characteristic to deactivate notification/indication on,
specified by either integer handle, UUID or directly by the
BleakGATTCharacteristic object representing it.
"""
self._raise_if_not_connected()
characteristic = self._resolve_characteristic(char_specifier)
# Do not raise KeyError if notifications are not enabled on this characteristic
# to be consistent with the behavior of the BlueZ backend
if notify_cancel := self._notify_cancels.pop(characteristic.handle, None):
notify_stop, _ = notify_cancel
await notify_stop()
def _raise_if_not_connected(self) -> None:
"""Raise a BleakError if not connected."""
if not self._is_connected:
raise BleakError(f"{self._description} is not connected")
def __del__(self) -> None:
"""Destructor to make sure the connection state is unsubscribed."""
if self._cancel_connection_state:
_LOGGER.warning(
(
"%s: ESPHomeClient bleak client was not properly"
" disconnected before destruction"
),
self._description,
)
if not self._loop.is_closed():
self._loop.call_soon_threadsafe(self._async_disconnected_cleanup)

View File

@ -1,42 +0,0 @@
"""BleakGATTDescriptorESPHome."""
from __future__ import annotations
from aioesphomeapi.model import BluetoothGATTDescriptor
from bleak.backends.descriptor import BleakGATTDescriptor
class BleakGATTDescriptorESPHome(BleakGATTDescriptor):
"""GATT Descriptor implementation for ESPHome backend."""
obj: BluetoothGATTDescriptor
def __init__(
self,
obj: BluetoothGATTDescriptor,
characteristic_uuid: str,
characteristic_handle: int,
) -> None:
"""Init a BleakGATTDescriptorESPHome."""
super().__init__(obj)
self.__characteristic_uuid: str = characteristic_uuid
self.__characteristic_handle: int = characteristic_handle
@property
def characteristic_handle(self) -> int:
"""Handle for the characteristic that this descriptor belongs to."""
return self.__characteristic_handle
@property
def characteristic_uuid(self) -> str:
"""UUID for the characteristic that this descriptor belongs to."""
return self.__characteristic_uuid
@property
def uuid(self) -> str:
"""UUID for this descriptor."""
return self.obj.uuid
@property
def handle(self) -> int:
"""Integer handle for this descriptor."""
return self.obj.handle

View File

@ -1,55 +0,0 @@
"""Bluetooth device models for esphome."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
import logging
from homeassistant.core import callback
_LOGGER = logging.getLogger(__name__)
@dataclass(slots=True)
class ESPHomeBluetoothDevice:
"""Bluetooth data for a specific ESPHome device."""
name: str
mac_address: str
ble_connections_free: int = 0
ble_connections_limit: int = 0
_ble_connection_free_futures: list[asyncio.Future[int]] = field(
default_factory=list
)
loop: asyncio.AbstractEventLoop = field(default_factory=asyncio.get_running_loop)
@callback
def async_update_ble_connection_limits(self, free: int, limit: int) -> None:
"""Update the BLE connection limits."""
_LOGGER.debug(
"%s [%s]: BLE connection limits: used=%s free=%s limit=%s",
self.name,
self.mac_address,
limit - free,
free,
limit,
)
self.ble_connections_free = free
self.ble_connections_limit = limit
if not free:
return
for fut in self._ble_connection_free_futures:
# If wait_for_ble_connections_free gets cancelled, it will
# leave a future in the list. We need to check if it's done
# before setting the result.
if not fut.done():
fut.set_result(free)
self._ble_connection_free_futures.clear()
async def wait_for_ble_connections_free(self) -> int:
"""Wait until there are free BLE connections."""
if self.ble_connections_free > 0:
return self.ble_connections_free
fut: asyncio.Future[int] = self.loop.create_future()
self._ble_connection_free_futures.append(fut)
return await fut

View File

@ -1,48 +0,0 @@
"""Bluetooth scanner for esphome."""
from __future__ import annotations
from aioesphomeapi import BluetoothLEAdvertisement, BluetoothLERawAdvertisementsResponse
from bluetooth_data_tools import (
int_to_bluetooth_address,
parse_advertisement_data_tuple,
)
from homeassistant.components.bluetooth import MONOTONIC_TIME, BaseHaRemoteScanner
from homeassistant.core import callback
class ESPHomeScanner(BaseHaRemoteScanner):
"""Scanner for esphome."""
__slots__ = ()
@callback
def async_on_advertisement(self, adv: BluetoothLEAdvertisement) -> None:
"""Call the registered callback."""
# The mac address is a uint64, but we need a string
self._async_on_advertisement(
int_to_bluetooth_address(adv.address),
adv.rssi,
adv.name,
adv.service_uuids,
adv.service_data,
adv.manufacturer_data,
None,
{"address_type": adv.address_type},
MONOTONIC_TIME(),
)
@callback
def async_on_raw_advertisements(
self, raw: BluetoothLERawAdvertisementsResponse
) -> None:
"""Call the registered callback."""
now = MONOTONIC_TIME()
for adv in raw.advertisements:
self._async_on_advertisement(
int_to_bluetooth_address(adv.address),
adv.rssi,
*parse_advertisement_data_tuple((adv.data,)),
{"address_type": adv.address_type},
now,
)

View File

@ -1,40 +0,0 @@
"""BleakGATTServiceESPHome."""
from __future__ import annotations
from aioesphomeapi.model import BluetoothGATTService
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.service import BleakGATTService
class BleakGATTServiceESPHome(BleakGATTService):
"""GATT Characteristic implementation for the ESPHome backend."""
obj: BluetoothGATTService
def __init__(self, obj: BluetoothGATTService) -> None:
"""Init a BleakGATTServiceESPHome."""
super().__init__(obj) # type: ignore[no-untyped-call]
self.__characteristics: list[BleakGATTCharacteristic] = []
self.__handle: int = self.obj.handle
@property
def handle(self) -> int:
"""Integer handle of this service."""
return self.__handle
@property
def uuid(self) -> str:
"""UUID for this service."""
return self.obj.uuid
@property
def characteristics(self) -> list[BleakGATTCharacteristic]:
"""List of characteristics for this service."""
return self.__characteristics
def add_characteristic(self, characteristic: BleakGATTCharacteristic) -> None:
"""Add a :py:class:`~BleakGATTCharacteristicESPHome` to the service.
Should not be used by end user, but rather by `bleak` itself.
"""
self.__characteristics.append(characteristic)

View File

@ -4,11 +4,12 @@ from __future__ import annotations
from dataclasses import dataclass, field
from typing import Self, cast
from bleak_esphome.backend.cache import ESPHomeBluetoothCache
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.json import JSONEncoder
from .bluetooth.cache import ESPHomeBluetoothCache
from .const import DOMAIN
from .entry_data import ESPHomeStorage, RuntimeEntryData

View File

@ -35,6 +35,7 @@ from aioesphomeapi import (
build_unique_id,
)
from aioesphomeapi.model import ButtonInfo
from bleak_esphome.backend.device import ESPHomeBluetoothDevice
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
@ -43,7 +44,6 @@ from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.storage import Store
from .bluetooth.device import ESPHomeBluetoothDevice
from .const import DOMAIN
from .dashboard import async_get_dashboard

View File

@ -13,11 +13,12 @@
"documentation": "https://www.home-assistant.io/integrations/esphome",
"integration_type": "device",
"iot_class": "local_push",
"loggers": ["aioesphomeapi", "noiseprotocol"],
"loggers": ["aioesphomeapi", "noiseprotocol", "bleak_esphome"],
"requirements": [
"aioesphomeapi==21.0.0",
"bluetooth-data-tools==1.18.0",
"esphome-dashboard-api==1.2.3"
"esphome-dashboard-api==1.2.3",
"bleak-esphome==0.2.0"
],
"zeroconf": ["_esphomelib._tcp.local."]
}

View File

@ -534,6 +534,9 @@ bimmer-connected[china]==0.14.6
# homeassistant.components.bizkaibus
bizkaibus==0.1.1
# homeassistant.components.esphome
bleak-esphome==0.2.0
# homeassistant.components.bluetooth
bleak-retry-connector==3.3.0

View File

@ -453,6 +453,9 @@ bellows==0.37.3
# homeassistant.components.bmw_connected_drive
bimmer-connected[china]==0.14.6
# homeassistant.components.esphome
bleak-esphome==0.2.0
# homeassistant.components.bluetooth
bleak-retry-connector==3.3.0

View File

@ -3,16 +3,13 @@ from __future__ import annotations
from aioesphomeapi import APIClient, APIVersion, BluetoothProxyFeature, DeviceInfo
from bleak.exc import BleakError
from bleak_esphome.backend.cache import ESPHomeBluetoothCache
from bleak_esphome.backend.client import ESPHomeClient, ESPHomeClientData
from bleak_esphome.backend.device import ESPHomeBluetoothDevice
from bleak_esphome.backend.scanner import ESPHomeScanner
import pytest
from homeassistant.components.bluetooth import HaBluetoothConnector
from homeassistant.components.esphome.bluetooth.cache import ESPHomeBluetoothCache
from homeassistant.components.esphome.bluetooth.client import (
ESPHomeClient,
ESPHomeClientData,
)
from homeassistant.components.esphome.bluetooth.device import ESPHomeBluetoothDevice
from homeassistant.components.esphome.bluetooth.scanner import ESPHomeScanner
from homeassistant.core import HomeAssistant
from tests.components.bluetooth import generate_ble_device