Update bluetooth_le_tracker to use Bleak (#75013)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
J. Nick Koston 2022-07-18 19:16:24 -05:00 committed by GitHub
parent 41e4b38c3a
commit b37f15b1d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 370 additions and 106 deletions

View File

@ -2,19 +2,20 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from collections.abc import Callable from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from uuid import UUID from uuid import UUID
import pygatt from bleak import BleakClient, BleakError
from bleak.backends.device import BLEDevice
import voluptuous as vol import voluptuous as vol
from homeassistant.components import bluetooth
from homeassistant.components.device_tracker import ( from homeassistant.components.device_tracker import (
PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA, PLATFORM_SCHEMA as PARENT_PLATFORM_SCHEMA,
) )
from homeassistant.components.device_tracker.const import ( from homeassistant.components.device_tracker.const import (
CONF_SCAN_INTERVAL,
CONF_TRACK_NEW, CONF_TRACK_NEW,
SCAN_INTERVAL, SCAN_INTERVAL,
SOURCE_TYPE_BLUETOOTH_LE, SOURCE_TYPE_BLUETOOTH_LE,
@ -23,10 +24,10 @@ from homeassistant.components.device_tracker.legacy import (
YAML_DEVICES, YAML_DEVICES,
async_load_config, async_load_config,
) )
from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.const import CONF_SCAN_INTERVAL, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant from homeassistant.core import Event, HomeAssistant, callback
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import track_point_in_utc_time from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -53,33 +54,31 @@ PLATFORM_SCHEMA = PARENT_PLATFORM_SCHEMA.extend(
) )
def setup_scanner( # noqa: C901 async def async_setup_scanner( # noqa: C901
hass: HomeAssistant, hass: HomeAssistant,
config: ConfigType, config: ConfigType,
see: Callable[..., None], async_see: Callable[..., Awaitable[None]],
discovery_info: DiscoveryInfoType | None = None, discovery_info: DiscoveryInfoType | None = None,
) -> bool: ) -> bool:
"""Set up the Bluetooth LE Scanner.""" """Set up the Bluetooth LE Scanner."""
new_devices: dict[str, dict] = {} new_devices: dict[str, dict] = {}
hass.data.setdefault(DATA_BLE, {DATA_BLE_ADAPTER: None})
def handle_stop(event):
"""Try to shut down the bluetooth child process nicely."""
# These should never be unset at the point this runs, but just for
# safety's sake, use `get`.
adapter = hass.data.get(DATA_BLE, {}).get(DATA_BLE_ADAPTER)
if adapter is not None:
adapter.kill()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, handle_stop)
if config[CONF_TRACK_BATTERY]: if config[CONF_TRACK_BATTERY]:
battery_track_interval = config[CONF_TRACK_BATTERY_INTERVAL] battery_track_interval = config[CONF_TRACK_BATTERY_INTERVAL]
else: else:
battery_track_interval = timedelta(0) battery_track_interval = timedelta(0)
def see_device(address, name, new_device=False, battery=None): yaml_path = hass.config.path(YAML_DEVICES)
devs_to_track: set[str] = set()
devs_no_track: set[str] = set()
devs_track_battery = {}
interval: timedelta = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
# if track new devices is true discover new devices
# on every scan.
track_new = config.get(CONF_TRACK_NEW)
async def async_see_device(address, name, new_device=False, battery=None):
"""Mark a device as seen.""" """Mark a device as seen."""
if name is not None: if name is not None:
name = name.strip("\x00") name = name.strip("\x00")
@ -95,7 +94,7 @@ def setup_scanner( # noqa: C901
if new_devices[address]["seen"] < MIN_SEEN_NEW: if new_devices[address]["seen"] < MIN_SEEN_NEW:
return return
_LOGGER.debug("Adding %s to tracked devices", address) _LOGGER.debug("Adding %s to tracked devices", address)
devs_to_track.append(address) devs_to_track.add(address)
if battery_track_interval > timedelta(0): if battery_track_interval > timedelta(0):
devs_track_battery[address] = dt_util.as_utc( devs_track_battery[address] = dt_util.as_utc(
datetime.fromtimestamp(0) datetime.fromtimestamp(0)
@ -105,109 +104,113 @@ def setup_scanner( # noqa: C901
new_devices[address] = {"seen": 1, "name": name} new_devices[address] = {"seen": 1, "name": name}
return return
see( await async_see(
mac=BLE_PREFIX + address, mac=BLE_PREFIX + address,
host_name=name, host_name=name,
source_type=SOURCE_TYPE_BLUETOOTH_LE, source_type=SOURCE_TYPE_BLUETOOTH_LE,
battery=battery, battery=battery,
) )
def discover_ble_devices():
"""Discover Bluetooth LE devices."""
_LOGGER.debug("Discovering Bluetooth LE devices")
try:
adapter = pygatt.GATTToolBackend()
hass.data[DATA_BLE][DATA_BLE_ADAPTER] = adapter
devs = adapter.scan()
devices = {x["address"]: x["name"] for x in devs}
_LOGGER.debug("Bluetooth LE devices discovered = %s", devices)
except (RuntimeError, pygatt.exceptions.BLEError) as error:
_LOGGER.error("Error during Bluetooth LE scan: %s", error)
return {}
return devices
yaml_path = hass.config.path(YAML_DEVICES)
devs_to_track = []
devs_donot_track = []
devs_track_battery = {}
# Load all known devices. # Load all known devices.
# We just need the devices so set consider_home and home range # We just need the devices so set consider_home and home range
# to 0 # to 0
for device in asyncio.run_coroutine_threadsafe( for device in await async_load_config(yaml_path, hass, timedelta(0)):
async_load_config(yaml_path, hass, timedelta(0)), hass.loop
).result():
# check if device is a valid bluetooth device # check if device is a valid bluetooth device
if device.mac and device.mac[:4].upper() == BLE_PREFIX: if device.mac and device.mac[:4].upper() == BLE_PREFIX:
address = device.mac[4:] address = device.mac[4:]
if device.track: if device.track:
_LOGGER.debug("Adding %s to BLE tracker", device.mac) _LOGGER.debug("Adding %s to BLE tracker", device.mac)
devs_to_track.append(address) devs_to_track.add(address)
if battery_track_interval > timedelta(0): if battery_track_interval > timedelta(0):
devs_track_battery[address] = dt_util.as_utc( devs_track_battery[address] = dt_util.as_utc(
datetime.fromtimestamp(0) datetime.fromtimestamp(0)
) )
else: else:
_LOGGER.debug("Adding %s to BLE do not track", device.mac) _LOGGER.debug("Adding %s to BLE do not track", device.mac)
devs_donot_track.append(address) devs_no_track.add(address)
# if track new devices is true discover new devices
# on every scan.
track_new = config.get(CONF_TRACK_NEW)
if not devs_to_track and not track_new: if not devs_to_track and not track_new:
_LOGGER.warning("No Bluetooth LE devices to track!") _LOGGER.warning("No Bluetooth LE devices to track!")
return False return False
interval = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL) async def _async_see_update_ble_battery(
mac: str,
def update_ble(now): now: datetime,
service_info: bluetooth.BluetoothServiceInfo,
) -> None:
"""Lookup Bluetooth LE devices and update status.""" """Lookup Bluetooth LE devices and update status."""
devs = discover_ble_devices()
if devs_track_battery:
adapter = hass.data[DATA_BLE][DATA_BLE_ADAPTER]
for mac in devs_to_track:
if mac not in devs:
continue
if devs[mac] is None:
devs[mac] = mac
battery = None battery = None
if ( ble_device: BLEDevice | str = (
mac in devs_track_battery bluetooth.async_ble_device_from_address(hass, mac) or mac
and now > devs_track_battery[mac] + battery_track_interval )
):
handle = None
try: try:
adapter.start(reset_on_start=False) async with BleakClient(ble_device) as client:
_LOGGER.debug("Reading battery for Bluetooth LE device %s", mac) bat_char = await client.read_gatt_char(BATTERY_CHARACTERISTIC_UUID)
bt_device = adapter.connect(mac) battery = ord(bat_char)
# Try to get the handle; it will raise a BLEError exception if not available except asyncio.TimeoutError:
handle = bt_device.get_handle(BATTERY_CHARACTERISTIC_UUID) _LOGGER.warning(
battery = ord(bt_device.char_read(BATTERY_CHARACTERISTIC_UUID)) "Timeout when trying to get battery status for %s", service_info.name
devs_track_battery[mac] = now )
except pygatt.exceptions.NotificationTimeout: # Bleak currently has a few places where checking dbus attributes
_LOGGER.warning("Timeout when trying to get battery status") # can raise when there is another error. We need to trap AttributeError
except pygatt.exceptions.BLEError as err: # until bleak releases v0.15+ which resolves these.
_LOGGER.warning("Could not read battery status: %s", err) except (AttributeError, BleakError) as err:
if handle is not None: _LOGGER.debug("Could not read battery status: %s", err)
# If the device does not offer battery information, there is no point in asking again later on. # If the device does not offer battery information, there is no point in asking again later on.
# Remove the device from the battery-tracked devices, so that their battery is not wasted # Remove the device from the battery-tracked devices, so that their battery is not wasted
# trying to get an unavailable information. # trying to get an unavailable information.
del devs_track_battery[mac] del devs_track_battery[mac]
finally: if battery:
adapter.stop() await async_see_device(mac, service_info.name, battery=battery)
see_device(mac, devs[mac], battery=battery)
@callback
def _async_update_ble(
service_info: bluetooth.BluetoothServiceInfo, change: bluetooth.BluetoothChange
) -> None:
"""Update from a ble callback."""
mac = service_info.address
if mac in devs_to_track:
now = dt_util.utcnow()
hass.async_create_task(async_see_device(mac, service_info.name))
if (
mac in devs_track_battery
and now > devs_track_battery[mac] + battery_track_interval
):
devs_track_battery[mac] = now
asyncio.create_task(
_async_see_update_ble_battery(mac, now, service_info)
)
if track_new: if track_new:
for address in devs: if mac not in devs_to_track and mac not in devs_no_track:
if address not in devs_to_track and address not in devs_donot_track: _LOGGER.info("Discovered Bluetooth LE device %s", mac)
_LOGGER.info("Discovered Bluetooth LE device %s", address) hass.async_create_task(
see_device(address, devs[address], new_device=True) async_see_device(mac, service_info.name, new_device=True)
)
track_point_in_utc_time(hass, update_ble, dt_util.utcnow() + interval) @callback
def _async_refresh_ble(now: datetime) -> None:
"""Refresh BLE devices from the discovered service info."""
# Make sure devices are seen again at the scheduled
# interval so they do not get set to not_home when
# there have been no callbacks because the RSSI or
# other properties have not changed.
for service_info in bluetooth.async_discovered_service_info(hass):
_async_update_ble(service_info, bluetooth.BluetoothChange.ADVERTISEMENT)
cancels = [
bluetooth.async_register_callback(hass, _async_update_ble, None),
async_track_time_interval(hass, _async_refresh_ble, interval),
]
@callback
def _async_handle_stop(event: Event) -> None:
"""Cancel the callback."""
for cancel in cancels:
cancel()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_handle_stop)
_async_refresh_ble(dt_util.now())
update_ble(dt_util.utcnow())
return True return True

View File

@ -2,8 +2,8 @@
"domain": "bluetooth_le_tracker", "domain": "bluetooth_le_tracker",
"name": "Bluetooth LE Tracker", "name": "Bluetooth LE Tracker",
"documentation": "https://www.home-assistant.io/integrations/bluetooth_le_tracker", "documentation": "https://www.home-assistant.io/integrations/bluetooth_le_tracker",
"requirements": ["pygatt[GATTTOOL]==4.0.5"], "dependencies": ["bluetooth"],
"codeowners": [], "codeowners": [],
"iot_class": "local_polling", "iot_class": "local_push",
"loggers": ["pygatt"] "loggers": []
} }

View File

@ -1530,7 +1530,6 @@ pyfronius==0.7.1
# homeassistant.components.ifttt # homeassistant.components.ifttt
pyfttt==0.3 pyfttt==0.3
# homeassistant.components.bluetooth_le_tracker
# homeassistant.components.skybeacon # homeassistant.components.skybeacon
pygatt[GATTTOOL]==4.0.5 pygatt[GATTTOOL]==4.0.5

View File

@ -1042,10 +1042,6 @@ pyfronius==0.7.1
# homeassistant.components.ifttt # homeassistant.components.ifttt
pyfttt==0.3 pyfttt==0.3
# homeassistant.components.bluetooth_le_tracker
# homeassistant.components.skybeacon
pygatt[GATTTOOL]==4.0.5
# homeassistant.components.hvv_departures # homeassistant.components.hvv_departures
pygti==0.9.2 pygti==0.9.2

View File

@ -0,0 +1,7 @@
"""Tests for the bluetooth_le_tracker component."""
import pytest
@pytest.fixture(autouse=True)
def bluetooth_le_tracker_auto_mock_bluetooth(mock_bluetooth):
"""Mock the bluetooth integration scanner."""

View File

@ -1,9 +1,17 @@
"""Test Bluetooth LE device tracker.""" """Test Bluetooth LE device tracker."""
import asyncio
from datetime import timedelta from datetime import timedelta
from unittest.mock import patch from unittest.mock import patch
from bleak import BleakError
from homeassistant.components.bluetooth import BluetoothServiceInfo
from homeassistant.components.bluetooth_le_tracker import device_tracker from homeassistant.components.bluetooth_le_tracker import device_tracker
from homeassistant.components.bluetooth_le_tracker.device_tracker import (
CONF_TRACK_BATTERY,
CONF_TRACK_BATTERY_INTERVAL,
)
from homeassistant.components.device_tracker.const import ( from homeassistant.components.device_tracker.const import (
CONF_SCAN_INTERVAL, CONF_SCAN_INTERVAL,
CONF_TRACK_NEW, CONF_TRACK_NEW,
@ -16,7 +24,49 @@ from homeassistant.util import dt as dt_util, slugify
from tests.common import async_fire_time_changed from tests.common import async_fire_time_changed
async def test_preserve_new_tracked_device_name(hass, mock_device_tracker_conf): class MockBleakClient:
"""Mock BleakClient."""
def __init__(self, *args, **kwargs):
"""Mock BleakClient."""
pass
async def __aenter__(self, *args, **kwargs):
"""Mock BleakClient.__aenter__."""
return self
async def __aexit__(self, *args, **kwargs):
"""Mock BleakClient.__aexit__."""
pass
class MockBleakClientTimesOut(MockBleakClient):
"""Mock BleakClient that times out."""
async def read_gatt_char(self, *args, **kwargs):
"""Mock BleakClient.read_gatt_char."""
raise asyncio.TimeoutError
class MockBleakClientFailing(MockBleakClient):
"""Mock BleakClient that fails."""
async def read_gatt_char(self, *args, **kwargs):
"""Mock BleakClient.read_gatt_char."""
raise BleakError("Failed")
class MockBleakClientBattery5(MockBleakClient):
"""Mock BleakClient that returns a battery level of 5."""
async def read_gatt_char(self, *args, **kwargs):
"""Mock BleakClient.read_gatt_char."""
return b"\x05"
async def test_preserve_new_tracked_device_name(
hass, mock_bluetooth, mock_device_tracker_conf
):
"""Test preserving tracked device name across new seens.""" """Test preserving tracked device name across new seens."""
address = "DE:AD:BE:EF:13:37" address = "DE:AD:BE:EF:13:37"
@ -24,13 +74,22 @@ async def test_preserve_new_tracked_device_name(hass, mock_device_tracker_conf):
entity_id = f"{DOMAIN}.{slugify(name)}" entity_id = f"{DOMAIN}.{slugify(name)}"
with patch( with patch(
"homeassistant.components." "homeassistant.components.bluetooth.async_discovered_service_info"
"bluetooth_le_tracker.device_tracker.pygatt.GATTToolBackend" ) as mock_async_discovered_service_info, patch.object(
) as mock_backend, patch.object(device_tracker, "MIN_SEEN_NEW", 3): device_tracker, "MIN_SEEN_NEW", 3
):
device = BluetoothServiceInfo(
name=name,
address=address,
rssi=-19,
manufacturer_data={},
service_data={},
service_uuids=[],
source="local",
)
# Return with name when seen first time # Return with name when seen first time
device = {"address": address, "name": name} mock_async_discovered_service_info.return_value = [device]
mock_backend.return_value.scan.return_value = [device]
config = { config = {
CONF_PLATFORM: "bluetooth_le_tracker", CONF_PLATFORM: "bluetooth_le_tracker",
@ -41,7 +100,17 @@ async def test_preserve_new_tracked_device_name(hass, mock_device_tracker_conf):
assert result assert result
# Seen once here; return without name when seen subsequent times # Seen once here; return without name when seen subsequent times
device["name"] = None device = BluetoothServiceInfo(
name=None,
address=address,
rssi=-19,
manufacturer_data={},
service_data={},
service_uuids=[],
source="local",
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
# Tick until device seen enough times for to be registered for tracking # Tick until device seen enough times for to be registered for tracking
for _ in range(device_tracker.MIN_SEEN_NEW - 1): for _ in range(device_tracker.MIN_SEEN_NEW - 1):
@ -54,3 +123,193 @@ async def test_preserve_new_tracked_device_name(hass, mock_device_tracker_conf):
state = hass.states.get(entity_id) state = hass.states.get(entity_id)
assert state assert state
assert state.name == name assert state.name == name
async def test_tracking_battery_times_out(
hass, mock_bluetooth, mock_device_tracker_conf
):
"""Test tracking the battery times out."""
address = "DE:AD:BE:EF:13:37"
name = "Mock device name"
entity_id = f"{DOMAIN}.{slugify(name)}"
with patch(
"homeassistant.components.bluetooth.async_discovered_service_info"
) as mock_async_discovered_service_info, patch.object(
device_tracker, "MIN_SEEN_NEW", 3
):
device = BluetoothServiceInfo(
name=name,
address=address,
rssi=-19,
manufacturer_data={},
service_data={},
service_uuids=[],
source="local",
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
config = {
CONF_PLATFORM: "bluetooth_le_tracker",
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_TRACK_BATTERY: True,
CONF_TRACK_BATTERY_INTERVAL: timedelta(minutes=2),
CONF_TRACK_NEW: True,
}
result = await async_setup_component(hass, DOMAIN, {DOMAIN: config})
assert result
# Tick until device seen enough times for to be registered for tracking
for _ in range(device_tracker.MIN_SEEN_NEW - 1):
async_fire_time_changed(
hass,
dt_util.utcnow() + config[CONF_SCAN_INTERVAL] + timedelta(seconds=1),
)
await hass.async_block_till_done()
with patch(
"homeassistant.components.bluetooth_le_tracker.device_tracker.BleakClient",
MockBleakClientTimesOut,
):
# Wait for the battery scan
async_fire_time_changed(
hass,
dt_util.utcnow()
+ config[CONF_SCAN_INTERVAL]
+ timedelta(seconds=1)
+ timedelta(minutes=2),
)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.name == name
assert "battery" not in state.attributes
async def test_tracking_battery_fails(hass, mock_bluetooth, mock_device_tracker_conf):
"""Test tracking the battery fails."""
address = "DE:AD:BE:EF:13:37"
name = "Mock device name"
entity_id = f"{DOMAIN}.{slugify(name)}"
with patch(
"homeassistant.components.bluetooth.async_discovered_service_info"
) as mock_async_discovered_service_info, patch.object(
device_tracker, "MIN_SEEN_NEW", 3
):
device = BluetoothServiceInfo(
name=name,
address=address,
rssi=-19,
manufacturer_data={},
service_data={},
service_uuids=[],
source="local",
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
config = {
CONF_PLATFORM: "bluetooth_le_tracker",
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_TRACK_BATTERY: True,
CONF_TRACK_BATTERY_INTERVAL: timedelta(minutes=2),
CONF_TRACK_NEW: True,
}
result = await async_setup_component(hass, DOMAIN, {DOMAIN: config})
assert result
# Tick until device seen enough times for to be registered for tracking
for _ in range(device_tracker.MIN_SEEN_NEW - 1):
async_fire_time_changed(
hass,
dt_util.utcnow() + config[CONF_SCAN_INTERVAL] + timedelta(seconds=1),
)
await hass.async_block_till_done()
with patch(
"homeassistant.components.bluetooth_le_tracker.device_tracker.BleakClient",
MockBleakClientFailing,
):
# Wait for the battery scan
async_fire_time_changed(
hass,
dt_util.utcnow()
+ config[CONF_SCAN_INTERVAL]
+ timedelta(seconds=1)
+ timedelta(minutes=2),
)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.name == name
assert "battery" not in state.attributes
async def test_tracking_battery_successful(
hass, mock_bluetooth, mock_device_tracker_conf
):
"""Test tracking the battery gets a value."""
address = "DE:AD:BE:EF:13:37"
name = "Mock device name"
entity_id = f"{DOMAIN}.{slugify(name)}"
with patch(
"homeassistant.components.bluetooth.async_discovered_service_info"
) as mock_async_discovered_service_info, patch.object(
device_tracker, "MIN_SEEN_NEW", 3
):
device = BluetoothServiceInfo(
name=name,
address=address,
rssi=-19,
manufacturer_data={},
service_data={},
service_uuids=[],
source="local",
)
# Return with name when seen first time
mock_async_discovered_service_info.return_value = [device]
config = {
CONF_PLATFORM: "bluetooth_le_tracker",
CONF_SCAN_INTERVAL: timedelta(minutes=1),
CONF_TRACK_BATTERY: True,
CONF_TRACK_BATTERY_INTERVAL: timedelta(minutes=2),
CONF_TRACK_NEW: True,
}
result = await async_setup_component(hass, DOMAIN, {DOMAIN: config})
assert result
# Tick until device seen enough times for to be registered for tracking
for _ in range(device_tracker.MIN_SEEN_NEW - 1):
async_fire_time_changed(
hass,
dt_util.utcnow() + config[CONF_SCAN_INTERVAL] + timedelta(seconds=1),
)
await hass.async_block_till_done()
with patch(
"homeassistant.components.bluetooth_le_tracker.device_tracker.BleakClient",
MockBleakClientBattery5,
):
# Wait for the battery scan
async_fire_time_changed(
hass,
dt_util.utcnow()
+ config[CONF_SCAN_INTERVAL]
+ timedelta(seconds=1)
+ timedelta(minutes=2),
)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state.name == name
assert state.attributes["battery"] == 5