diff --git a/homeassistant/components/bluetooth/active_update_coordinator.py b/homeassistant/components/bluetooth/active_update_coordinator.py new file mode 100644 index 00000000000..c4d40d5eaeb --- /dev/null +++ b/homeassistant/components/bluetooth/active_update_coordinator.py @@ -0,0 +1,164 @@ +"""A Bluetooth passive coordinator that receives data from advertisements but can also poll.""" +from __future__ import annotations + +from collections.abc import Callable, Coroutine +import logging +from typing import Any, Generic, TypeVar + +from bleak import BleakError + +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.debounce import Debouncer +from homeassistant.util.dt import monotonic_time_coarse + +from . import BluetoothChange, BluetoothScanningMode, BluetoothServiceInfoBleak +from .passive_update_coordinator import PassiveBluetoothDataUpdateCoordinator + +POLL_DEFAULT_COOLDOWN = 10 +POLL_DEFAULT_IMMEDIATE = True + +_T = TypeVar("_T") + + +class ActiveBluetoothDataUpdateCoordinator( + Generic[_T], PassiveBluetoothDataUpdateCoordinator +): + """ + A coordinator that receives passive data from advertisements but can also poll. + + Unlike the passive processor coordinator, this coordinator does call a parser + method to parse the data from the advertisement. + + Every time an advertisement is received, needs_poll_method is called to work + out if a poll is needed. This should return True if it is and False if it is + not needed. + + def needs_poll_method(svc_info: BluetoothServiceInfoBleak, last_poll: float | None) -> bool: + return True + + If there has been no poll since HA started, `last_poll` will be None. Otherwise it is + the number of seconds since one was last attempted. + + If a poll is needed, the coordinator will call poll_method. This is a coroutine. + It should return the same type of data as your update_method. The expectation is that + data from advertisements and from polling are being parsed and fed into a shared + object that represents the current state of the device. + + async def poll_method(svc_info: BluetoothServiceInfoBleak) -> YourDataType: + return YourDataType(....) + + BluetoothServiceInfoBleak.device contains a BLEDevice. You should use this in + your poll function, as it is the most efficient way to get a BleakClient. + + Once the poll is complete, the coordinator will call _async_handle_bluetooth_poll + which needs to be implemented in the subclass. + """ + + def __init__( + self, + hass: HomeAssistant, + logger: logging.Logger, + *, + address: str, + mode: BluetoothScanningMode, + needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool], + poll_method: Callable[ + [BluetoothServiceInfoBleak], + Coroutine[Any, Any, _T], + ] + | None = None, + poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None, + connectable: bool = True, + ) -> None: + """Initialize the coordinator.""" + super().__init__(hass, logger, address, mode, connectable) + # It's None before the first successful update. + # Set type to just T to remove annoying checks that data is not None + # when it was already checked during setup. + self.data: _T = None # type: ignore[assignment] + + self._needs_poll_method = needs_poll_method + self._poll_method = poll_method + self._last_poll: float | None = None + self.last_poll_successful = True + + # We keep the last service info in case the poller needs to refer to + # e.g. its BLEDevice + self._last_service_info: BluetoothServiceInfoBleak | None = None + + if poll_debouncer is None: + poll_debouncer = Debouncer( + hass, + logger, + cooldown=POLL_DEFAULT_COOLDOWN, + immediate=POLL_DEFAULT_IMMEDIATE, + function=self._async_poll, + ) + else: + poll_debouncer.function = self._async_poll + + self._debounced_poll = poll_debouncer + + def needs_poll(self, service_info: BluetoothServiceInfoBleak) -> bool: + """Return true if time to try and poll.""" + poll_age: float | None = None + if self._last_poll: + poll_age = monotonic_time_coarse() - self._last_poll + return self._needs_poll_method(service_info, poll_age) + + async def _async_poll_data( + self, last_service_info: BluetoothServiceInfoBleak + ) -> _T: + """Fetch the latest data from the source.""" + if self._poll_method is None: + raise NotImplementedError("Poll method not implemented") + return await self._poll_method(last_service_info) + + async def _async_poll(self) -> None: + """Poll the device to retrieve any extra data.""" + assert self._last_service_info + + try: + self.data = await self._async_poll_data(self._last_service_info) + except BleakError as exc: + if self.last_poll_successful: + self.logger.error( + "%s: Bluetooth error whilst polling: %s", self.address, str(exc) + ) + self.last_poll_successful = False + return + except Exception: # pylint: disable=broad-except + if self.last_poll_successful: + self.logger.exception("%s: Failure while polling", self.address) + self.last_poll_successful = False + return + finally: + self._last_poll = monotonic_time_coarse() + + if not self.last_poll_successful: + self.logger.debug("%s: Polling recovered") + self.last_poll_successful = True + + self._async_handle_bluetooth_poll() + + @callback + def _async_handle_bluetooth_poll(self) -> None: + """Handle a poll event.""" + self.async_update_listeners() + + @callback + def _async_handle_bluetooth_event( + self, + service_info: BluetoothServiceInfoBleak, + change: BluetoothChange, + ) -> None: + """Handle a Bluetooth event.""" + super()._async_handle_bluetooth_event(service_info, change) + + self._last_service_info = service_info + + # See if its time to poll + # We use bluetooth events to trigger the poll so that we scan as soon as + # possible after a device comes online or back in range, if a poll is due + if self.needs_poll(service_info): + self.hass.async_create_task(self._debounced_poll.async_call()) diff --git a/homeassistant/components/bluetooth/active_update_processor.py b/homeassistant/components/bluetooth/active_update_processor.py index ab26a0260f3..e175fc665f4 100644 --- a/homeassistant/components/bluetooth/active_update_processor.py +++ b/homeassistant/components/bluetooth/active_update_processor.py @@ -1,4 +1,4 @@ -"""A Bluetooth passive coordinator that collects data from advertisements but can also poll.""" +"""A Bluetooth passive processor coordinator that collects data from advertisements but can also poll.""" from __future__ import annotations from collections.abc import Callable, Coroutine @@ -24,7 +24,7 @@ class ActiveBluetoothProcessorCoordinator( Generic[_T], PassiveBluetoothProcessorCoordinator[_T] ): """ - A coordinator that parses passive data from advertisements but can also poll. + A processor coordinator that parses passive data from advertisements but can also poll. Every time an advertisement is received, needs_poll_method is called to work out if a poll is needed. This should return True if it is and False if it is diff --git a/tests/components/bluetooth/test_active_update_coordinator.py b/tests/components/bluetooth/test_active_update_coordinator.py new file mode 100644 index 00000000000..a82de1e0e08 --- /dev/null +++ b/tests/components/bluetooth/test_active_update_coordinator.py @@ -0,0 +1,383 @@ +"""Tests for the Bluetooth integration ActiveBluetoothDataUpdateCoordinator.""" +from __future__ import annotations + +import asyncio +from collections.abc import Callable, Coroutine +import logging +from typing import Any +from unittest.mock import MagicMock + +from bleak.exc import BleakError + +from homeassistant.components.bluetooth import ( + DOMAIN, + BluetoothChange, + BluetoothScanningMode, + BluetoothServiceInfoBleak, +) +from homeassistant.components.bluetooth.active_update_coordinator import ( + _T, + ActiveBluetoothDataUpdateCoordinator, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.debounce import Debouncer +from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo +from homeassistant.setup import async_setup_component + +from . import inject_bluetooth_service_info + +_LOGGER = logging.getLogger(__name__) + + +GENERIC_BLUETOOTH_SERVICE_INFO = BluetoothServiceInfo( + name="Generic", + address="aa:bb:cc:dd:ee:ff", + rssi=-95, + manufacturer_data={ + 1: b"\x01\x01\x01\x01\x01\x01\x01\x01", + }, + service_data={}, + service_uuids=[], + source="local", +) + +GENERIC_BLUETOOTH_SERVICE_INFO_2 = BluetoothServiceInfo( + name="Generic", + address="aa:bb:cc:dd:ee:ff", + rssi=-95, + manufacturer_data={ + 2: b"\x01\x01\x01\x01\x01\x01\x01\x01", + }, + service_data={}, + service_uuids=[], + source="local", +) + + +class MyCoordinator(ActiveBluetoothDataUpdateCoordinator[dict[str, Any]]): + """An example coordinator that subclasses ActiveBluetoothDataUpdateCoordinator.""" + + def __init__( + self, + hass: HomeAssistant, + logger: logging.Logger, + *, + address: str, + mode: BluetoothScanningMode, + needs_poll_method: Callable[[BluetoothServiceInfoBleak, float | None], bool], + poll_method: Callable[ + [BluetoothServiceInfoBleak], + Coroutine[Any, Any, _T], + ] + | None = None, + poll_debouncer: Debouncer[Coroutine[Any, Any, None]] | None = None, + connectable: bool = True, + ) -> None: + """Initialize the coordinator.""" + self.passive_data: dict[str, Any] = {} + super().__init__( + hass=hass, + logger=logger, + address=address, + mode=mode, + needs_poll_method=needs_poll_method, + poll_method=poll_method, + poll_debouncer=poll_debouncer, + connectable=connectable, + ) + + def _async_handle_bluetooth_event( + self, + service_info: BluetoothServiceInfo, + change: BluetoothChange, + ) -> None: + """Handle a Bluetooth event.""" + self.passive_data = {"rssi": service_info.rssi} + super()._async_handle_bluetooth_event(service_info, change) + + +async def test_basic_usage(hass, mock_bleak_scanner_start, mock_bluetooth_adapters): + """Test basic usage of the ActiveBluetoothDataUpdateCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + + def _needs_poll( + service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None + ) -> bool: + return True + + async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]: + return {"fake": "data"} + + coordinator = MyCoordinator( + hass=hass, + logger=_LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + needs_poll_method=_needs_poll, + poll_method=_poll_method, + ) + assert coordinator.available is False # no data yet + + mock_listener = MagicMock() + unregister_listener = coordinator.async_add_listener(mock_listener) + + cancel = coordinator.async_start() + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + assert coordinator.data == {"fake": "data"} + + cancel() + unregister_listener() + + +async def test_bleak_error_during_polling( + hass, mock_bleak_scanner_start, mock_bluetooth_adapters +): + """Test bleak error during polling ActiveBluetoothDataUpdateCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + poll_count = 0 + + def _needs_poll( + service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None + ) -> bool: + return True + + async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]: + nonlocal poll_count + poll_count += 1 + if poll_count == 1: + raise BleakError("fake bleak error") + return {"fake": "data"} + + coordinator = MyCoordinator( + hass=hass, + logger=_LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + needs_poll_method=_needs_poll, + poll_method=_poll_method, + poll_debouncer=Debouncer(hass, _LOGGER, cooldown=0, immediate=True), + ) + assert coordinator.available is False # no data yet + + mock_listener = MagicMock() + unregister_listener = coordinator.async_add_listener(mock_listener) + + cancel = coordinator.async_start() + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + assert coordinator.data is None + assert coordinator.last_poll_successful is False + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO_2.rssi} + assert coordinator.data == {"fake": "data"} + assert coordinator.last_poll_successful is True + + cancel() + unregister_listener() + + +async def test_generic_exception_during_polling( + hass, mock_bleak_scanner_start, mock_bluetooth_adapters +): + """Test generic exception during polling ActiveBluetoothDataUpdateCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + poll_count = 0 + + def _needs_poll( + service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None + ) -> bool: + return True + + async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]: + nonlocal poll_count + poll_count += 1 + if poll_count == 1: + raise ValueError("fake error") + return {"fake": "data"} + + coordinator = MyCoordinator( + hass=hass, + logger=_LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + needs_poll_method=_needs_poll, + poll_method=_poll_method, + poll_debouncer=Debouncer(hass, _LOGGER, cooldown=0, immediate=True), + ) + assert coordinator.available is False # no data yet + + mock_listener = MagicMock() + unregister_listener = coordinator.async_add_listener(mock_listener) + + cancel = coordinator.async_start() + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + assert coordinator.data is None + assert coordinator.last_poll_successful is False + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO_2.rssi} + assert coordinator.data == {"fake": "data"} + assert coordinator.last_poll_successful is True + + cancel() + unregister_listener() + + +async def test_polling_debounce( + hass, mock_bleak_scanner_start, mock_bluetooth_adapters +): + """Test basic usage of the ActiveBluetoothDataUpdateCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + poll_count = 0 + + def _needs_poll( + service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None + ) -> bool: + return True + + async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]: + nonlocal poll_count + poll_count += 1 + await asyncio.sleep(0.0001) + return {"poll_count": poll_count} + + coordinator = MyCoordinator( + hass=hass, + logger=_LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + needs_poll_method=_needs_poll, + poll_method=_poll_method, + ) + assert coordinator.available is False # no data yet + + mock_listener = MagicMock() + unregister_listener = coordinator.async_add_listener(mock_listener) + + cancel = coordinator.async_start() + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + # We should only get one poll because of the debounce + assert coordinator.data == {"poll_count": 1} + + cancel() + unregister_listener() + + +async def test_polling_debounce_with_custom_debouncer( + hass, mock_bleak_scanner_start, mock_bluetooth_adapters +): + """Test basic usage of the ActiveBluetoothDataUpdateCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + poll_count = 0 + + def _needs_poll( + service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None + ) -> bool: + return True + + async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]: + nonlocal poll_count + poll_count += 1 + await asyncio.sleep(0.0001) + return {"poll_count": poll_count} + + coordinator = MyCoordinator( + hass=hass, + logger=_LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + needs_poll_method=_needs_poll, + poll_method=_poll_method, + poll_debouncer=Debouncer(hass, _LOGGER, cooldown=0.1, immediate=True), + ) + assert coordinator.available is False # no data yet + + mock_listener = MagicMock() + unregister_listener = coordinator.async_add_listener(mock_listener) + + cancel = coordinator.async_start() + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + # We should only get one poll because of the debounce + assert coordinator.data == {"poll_count": 1} + + cancel() + unregister_listener() + + +async def test_polling_rejecting_the_first_time( + hass, mock_bleak_scanner_start, mock_bluetooth_adapters +): + """Test need_poll rejects the first time ActiveBluetoothDataUpdateCoordinator.""" + await async_setup_component(hass, DOMAIN, {DOMAIN: {}}) + attempt = 0 + + def _needs_poll( + service_info: BluetoothServiceInfoBleak, seconds_since_last_poll: float | None + ) -> bool: + nonlocal attempt + attempt += 1 + return attempt != 1 + + async def _poll_method(service_info: BluetoothServiceInfoBleak) -> dict[str, Any]: + return {"fake": "data"} + + coordinator = MyCoordinator( + hass=hass, + logger=_LOGGER, + address="aa:bb:cc:dd:ee:ff", + mode=BluetoothScanningMode.ACTIVE, + needs_poll_method=_needs_poll, + poll_method=_poll_method, + ) + assert coordinator.available is False # no data yet + + mock_listener = MagicMock() + unregister_listener = coordinator.async_add_listener(mock_listener) + + cancel = coordinator.async_start() + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + # First poll is rejected, so no data yet + assert coordinator.data is None + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + # Data is the same so no poll check + assert coordinator.data is None + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO_2) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO_2.rssi} + # Data is different so poll is done + assert coordinator.data == {"fake": "data"} + + inject_bluetooth_service_info(hass, GENERIC_BLUETOOTH_SERVICE_INFO) + await hass.async_block_till_done() + assert coordinator.passive_data == {"rssi": GENERIC_BLUETOOTH_SERVICE_INFO.rssi} + # Data is different again so poll is done + assert coordinator.data == {"fake": "data"} + + cancel() + unregister_listener()