Implement an active update coordinator for polling ble devices that get data from both advertisements and active connections (#84207)

This commit is contained in:
J. Nick Koston 2022-12-19 16:09:29 -10:00 committed by GitHub
parent 7f8c4293b4
commit 53363cf7e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 549 additions and 2 deletions

View File

@ -0,0 +1,164 @@
"""A Bluetooth passive coordinator that receives data from advertisements but can also poll."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
import logging
from typing import Any, Generic, TypeVar
from bleak import BleakError
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer
from homeassistant.util.dt import monotonic_time_coarse
from . import BluetoothChange, BluetoothScanningMode, BluetoothServiceInfoBleak
from .passive_update_coordinator import PassiveBluetoothDataUpdateCoordinator
POLL_DEFAULT_COOLDOWN = 10
POLL_DEFAULT_IMMEDIATE = True
_T = TypeVar("_T")
class ActiveBluetoothDataUpdateCoordinator(
Generic[_T], PassiveBluetoothDataUpdateCoordinator
):
"""
A coordinator that receives passive data from advertisements but can also poll.
Unlike the passive processor coordinator, this coordinator does call a parser
method to parse the data from the advertisement.
Every time an advertisement is received, needs_poll_method is called to work
out if a poll is needed. This should return True if it is and False if it is
not needed.
def needs_poll_method(svc_info: BluetoothServiceInfoBleak, last_poll: float | None) -> bool:
return True
If there has been no poll since HA started, `last_poll` will be None. Otherwise it is
the number of seconds since one was last attempted.
If a poll is needed, the coordinator will call poll_method. This is a coroutine.
It should return the same type of data as your update_method. The expectation is that
data from advertisements and from polling are being parsed and fed into a shared
object that represents the current state of the device.
async def poll_method(svc_info: BluetoothServiceInfoBleak) -> YourDataType:
return YourDataType(....)
BluetoothServiceInfoBleak.device contains a BLEDevice. You should use this in
your poll function, as it is the most efficient way to get a BleakClient.
Once the poll is complete, the coordinator will call _async_handle_bluetooth_poll
which needs to be implemented in the subclass.
"""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
*,
address: str,
mode: BluetoothScanningMode,
needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool],
poll_method: Callable[
[BluetoothServiceInfoBleak],
Coroutine[Any, Any, _T],
]
| None = None,
poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None,
connectable: bool = True,
) -> None:
"""Initialize the coordinator."""
super().__init__(hass, logger, address, mode, connectable)
# It's None before the first successful update.
# Set type to just T to remove annoying checks that data is not None
# when it was already checked during setup.
self.data: _T = None # type: ignore[assignment]
self._needs_poll_method = needs_poll_method
self._poll_method = poll_method
self._last_poll: float | None = None
self.last_poll_successful = True
# We keep the last service info in case the poller needs to refer to
# e.g. its BLEDevice
self._last_service_info: BluetoothServiceInfoBleak | None = None
if poll_debouncer is None:
poll_debouncer = Debouncer(
hass,
logger,
cooldown=POLL_DEFAULT_COOLDOWN,
immediate=POLL_DEFAULT_IMMEDIATE,
function=self._async_poll,
)
else:
poll_debouncer.function = self._async_poll
self._debounced_poll = poll_debouncer
def needs_poll(self, service_info: BluetoothServiceInfoBleak) -> bool:
"""Return true if time to try and poll."""
poll_age: float | None = None
if self._last_poll:
poll_age = monotonic_time_coarse() - self._last_poll
return self._needs_poll_method(service_info, poll_age)
async def _async_poll_data(
self, last_service_info: BluetoothServiceInfoBleak
) -> _T:
"""Fetch the latest data from the source."""
if self._poll_method is None:
raise NotImplementedError("Poll method not implemented")
return await self._poll_method(last_service_info)
async def _async_poll(self) -> None:
"""Poll the device to retrieve any extra data."""
assert self._last_service_info
try:
self.data = await self._async_poll_data(self._last_service_info)
except BleakError as exc:
if self.last_poll_successful:
self.logger.error(
"%s: Bluetooth error whilst polling: %s", self.address, str(exc)
)
self.last_poll_successful = False
return
except Exception: # pylint: disable=broad-except
if self.last_poll_successful:
self.logger.exception("%s: Failure while polling", self.address)
self.last_poll_successful = False
return
finally:
self._last_poll = monotonic_time_coarse()
if not self.last_poll_successful:
self.logger.debug("%s: Polling recovered")
self.last_poll_successful = True
self._async_handle_bluetooth_poll()
@callback
def _async_handle_bluetooth_poll(self) -> None:
"""Handle a poll event."""
self.async_update_listeners()
@callback
def _async_handle_bluetooth_event(
self,
service_info: BluetoothServiceInfoBleak,
change: BluetoothChange,
) -> None:
"""Handle a Bluetooth event."""
super()._async_handle_bluetooth_event(service_info, change)
self._last_service_info = service_info
# See if its time to poll
# We use bluetooth events to trigger the poll so that we scan as soon as
# possible after a device comes online or back in range, if a poll is due
if self.needs_poll(service_info):
self.hass.async_create_task(self._debounced_poll.async_call())

View File

@ -1,4 +1,4 @@
"""A Bluetooth passive coordinator that collects data from advertisements but can also poll."""
"""A Bluetooth passive processor coordinator that collects data from advertisements but can also poll."""
from __future__ import annotations
from collections.abc import Callable, Coroutine
@ -24,7 +24,7 @@ class ActiveBluetoothProcessorCoordinator(
Generic[_T], PassiveBluetoothProcessorCoordinator[_T]
):
"""
A coordinator that parses passive data from advertisements but can also poll.
A processor coordinator that parses passive data from advertisements but can also poll.
Every time an advertisement is received, needs_poll_method is called to work
out if a poll is needed. This should return True if it is and False if it is

View File

@ -0,0 +1,383 @@
"""Tests for the Bluetooth integration ActiveBluetoothDataUpdateCoordinator."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
import logging
from typing import Any
from unittest.mock import MagicMock
from bleak.exc import BleakError
from homeassistant.components.bluetooth import (
DOMAIN,
BluetoothChange,
BluetoothScanningMode,
BluetoothServiceInfoBleak,
)
from homeassistant.components.bluetooth.active_update_coordinator import (
_T,
ActiveBluetoothDataUpdateCoordinator,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
from homeassistant.setup import async_setup_component
from . import inject_bluetooth_service_info
_LOGGER = logging.getLogger(__name__)
GENERIC_BLUETOOTH_SERVICE_INFO = BluetoothServiceInfo(
name="Generic",
address="aa:bb:cc:dd:ee:ff",
rssi=-95,
manufacturer_data={
1: b"\x01\x01\x01\x01\x01\x01\x01\x01",
},
service_data={},
service_uuids=[],
source="local",
)
GENERIC_BLUETOOTH_SERVICE_INFO_2 = BluetoothServiceInfo(
name="Generic",
address="aa:bb:cc:dd:ee:ff",
rssi=-95,
manufacturer_data={
2: b"\x01\x01\x01\x01\x01\x01\x01\x01",
},
service_data={},
service_uuids=[],
source="local",
)
class MyCoordinator(ActiveBluetoothDataUpdateCoordinator[dict[str, Any]]):
"""An example coordinator that subclasses ActiveBluetoothDataUpdateCoordinator."""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
*,
address: str,
mode: BluetoothScanningMode,
needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool],
poll_method: Callable[
[BluetoothServiceInfoBleak],
Coroutine[Any, Any, _T],
]
| None = None,
poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None,
connectable: bool = True,
) -> None:
"""Initialize the coordinator."""
self.passive_data: dict[str, Any] = {}
super().__init__(
hass=hass,
logger=logger,
address=address,
mode=mode,
needs_poll_method=needs_poll_method,
poll_method=poll_method,
poll_debouncer=poll_debouncer,
connectable=connectable,
)
def _async_handle_bluetooth_event(
self,
service_info: BluetoothServiceInfo,
change: BluetoothChange,
) -> None:
"""Handle a Bluetooth event."""
self.passive_data = {"rssi": service_info.rssi}
super()._async_handle_bluetooth_event(service_info, change)
async def test_basic_usage(hass, mock_bleak_scanner_start, mock_bluetooth_adapters):
"""Test basic usage of the ActiveBluetoothDataUpdateCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
def _needs_poll(
service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None
) -> bool:
return True
async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]:
return {"fake": "data"}
coordinator = MyCoordinator(
hass=hass,
logger=_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
needs_poll_method=_needs_poll,
poll_method=_poll_method,
)
assert coordinator.available is False # no data yet
mock_listener = MagicMock()
unregister_listener = coordinator.async_add_listener(mock_listener)
cancel = coordinator.async_start()
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
assert coordinator.data == {"fake": "data"}
cancel()
unregister_listener()
async def test_bleak_error_during_polling(
hass, mock_bleak_scanner_start, mock_bluetooth_adapters
):
"""Test bleak error during polling ActiveBluetoothDataUpdateCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
poll_count = 0
def _needs_poll(
service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None
) -> bool:
return True
async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]:
nonlocal poll_count
poll_count += 1
if poll_count == 1:
raise BleakError("fake bleak error")
return {"fake": "data"}
coordinator = MyCoordinator(
hass=hass,
logger=_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
needs_poll_method=_needs_poll,
poll_method=_poll_method,
poll_debouncer=Debouncer(hass, _LOGGER, cooldown=0, immediate=True),
)
assert coordinator.available is False # no data yet
mock_listener = MagicMock()
unregister_listener = coordinator.async_add_listener(mock_listener)
cancel = coordinator.async_start()
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
assert coordinator.data is None
assert coordinator.last_poll_successful is False
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO_2.rssi}
assert coordinator.data == {"fake": "data"}
assert coordinator.last_poll_successful is True
cancel()
unregister_listener()
async def test_generic_exception_during_polling(
hass, mock_bleak_scanner_start, mock_bluetooth_adapters
):
"""Test generic exception during polling ActiveBluetoothDataUpdateCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
poll_count = 0
def _needs_poll(
service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None
) -> bool:
return True
async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]:
nonlocal poll_count
poll_count += 1
if poll_count == 1:
raise ValueError("fake error")
return {"fake": "data"}
coordinator = MyCoordinator(
hass=hass,
logger=_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
needs_poll_method=_needs_poll,
poll_method=_poll_method,
poll_debouncer=Debouncer(hass, _LOGGER, cooldown=0, immediate=True),
)
assert coordinator.available is False # no data yet
mock_listener = MagicMock()
unregister_listener = coordinator.async_add_listener(mock_listener)
cancel = coordinator.async_start()
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
assert coordinator.data is None
assert coordinator.last_poll_successful is False
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO_2.rssi}
assert coordinator.data == {"fake": "data"}
assert coordinator.last_poll_successful is True
cancel()
unregister_listener()
async def test_polling_debounce(
hass, mock_bleak_scanner_start, mock_bluetooth_adapters
):
"""Test basic usage of the ActiveBluetoothDataUpdateCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
poll_count = 0
def _needs_poll(
service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None
) -> bool:
return True
async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]:
nonlocal poll_count
poll_count += 1
await asyncio.sleep(0.0001)
return {"poll_count": poll_count}
coordinator = MyCoordinator(
hass=hass,
logger=_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
needs_poll_method=_needs_poll,
poll_method=_poll_method,
)
assert coordinator.available is False # no data yet
mock_listener = MagicMock()
unregister_listener = coordinator.async_add_listener(mock_listener)
cancel = coordinator.async_start()
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
# We should only get one poll because of the debounce
assert coordinator.data == {"poll_count": 1}
cancel()
unregister_listener()
async def test_polling_debounce_with_custom_debouncer(
hass, mock_bleak_scanner_start, mock_bluetooth_adapters
):
"""Test basic usage of the ActiveBluetoothDataUpdateCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
poll_count = 0
def _needs_poll(
service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None
) -> bool:
return True
async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]:
nonlocal poll_count
poll_count += 1
await asyncio.sleep(0.0001)
return {"poll_count": poll_count}
coordinator = MyCoordinator(
hass=hass,
logger=_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
needs_poll_method=_needs_poll,
poll_method=_poll_method,
poll_debouncer=Debouncer(hass, _LOGGER, cooldown=0.1, immediate=True),
)
assert coordinator.available is False # no data yet
mock_listener = MagicMock()
unregister_listener = coordinator.async_add_listener(mock_listener)
cancel = coordinator.async_start()
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
# We should only get one poll because of the debounce
assert coordinator.data == {"poll_count": 1}
cancel()
unregister_listener()
async def test_polling_rejecting_the_first_time(
hass, mock_bleak_scanner_start, mock_bluetooth_adapters
):
"""Test need_poll rejects the first time ActiveBluetoothDataUpdateCoordinator."""
await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
attempt = 0
def _needs_poll(
service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None
) -> bool:
nonlocal attempt
attempt += 1
return attempt != 1
async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]:
return {"fake": "data"}
coordinator = MyCoordinator(
hass=hass,
logger=_LOGGER,
address="aa:bb:cc:dd:ee:ff",
mode=BluetoothScanningMode.ACTIVE,
needs_poll_method=_needs_poll,
poll_method=_poll_method,
)
assert coordinator.available is False # no data yet
mock_listener = MagicMock()
unregister_listener = coordinator.async_add_listener(mock_listener)
cancel = coordinator.async_start()
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
# First poll is rejected, so no data yet
assert coordinator.data is None
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
# Data is the same so no poll check
assert coordinator.data is None
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO_2.rssi}
# Data is different so poll is done
assert coordinator.data == {"fake": "data"}
inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO)
await hass.async_block_till_done()
assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi}
# Data is different again so poll is done
assert coordinator.data == {"fake": "data"}
cancel()
unregister_listener()