Split bluetooth manager so it can be extracted into the habluetooth lib (#105015)

This commit is contained in:
J. Nick Koston 2023-12-05 08:54:50 -10:00 committed by GitHub
parent 94d168e20e
commit 3c635fdbf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 195 additions and 170 deletions

View File

@ -81,7 +81,7 @@ from .const import (
LINUX_FIRMWARE_LOAD_FALLBACK_SECONDS,
SOURCE_LOCAL,
)
from .manager import MONOTONIC_TIME, BluetoothManager
from .manager import MONOTONIC_TIME, HomeAssistantBluetoothManager
from .match import BluetoothCallbackMatcher, IntegrationMatcher
from .models import BluetoothCallback, BluetoothChange
from .storage import BluetoothStorage
@ -143,11 +143,13 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
await bluetooth_storage.async_setup()
slot_manager = BleakSlotManager()
await slot_manager.async_setup()
manager = BluetoothManager(
manager = HomeAssistantBluetoothManager(
hass, integration_matcher, bluetooth_adapters, bluetooth_storage, slot_manager
)
await manager.async_setup()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, manager.async_stop)
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, lambda event: manager.async_stop()
)
hass.data[DATA_MANAGER] = models.MANAGER = manager
adapters = await manager.async_get_bluetooth_adapters()
@ -284,7 +286,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
passive = entry.options.get(CONF_PASSIVE)
mode = BluetoothScanningMode.PASSIVE if passive else BluetoothScanningMode.ACTIVE
new_info_callback = async_get_advertisement_callback(hass)
manager: BluetoothManager = hass.data[DATA_MANAGER]
manager: HomeAssistantBluetoothManager = hass.data[DATA_MANAGER]
scanner = HaScanner(mode, adapter, address, new_info_callback)
try:
scanner.async_setup()

View File

@ -16,7 +16,7 @@ from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_ca
from .base_scanner import BaseHaScanner, BluetoothScannerDevice
from .const import DATA_MANAGER
from .manager import BluetoothManager
from .manager import HomeAssistantBluetoothManager
from .match import BluetoothCallbackMatcher
from .models import BluetoothCallback, BluetoothChange, ProcessAdvertisementCallback
from .wrappers import HaBleakScannerWrapper
@ -25,9 +25,9 @@ if TYPE_CHECKING:
from bleak.backends.device import BLEDevice
def _get_manager(hass: HomeAssistant) -> BluetoothManager:
def _get_manager(hass: HomeAssistant) -> HomeAssistantBluetoothManager:
"""Get the bluetooth manager."""
return cast(BluetoothManager, hass.data[DATA_MANAGER])
return cast(HomeAssistantBluetoothManager, hass.data[DATA_MANAGER])
@hass_callback

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Iterable
from datetime import datetime, timedelta
import itertools
import logging
from typing import TYPE_CHECKING, Any, Final
@ -28,7 +27,6 @@ from homeassistant.core import (
callback as hass_callback,
)
from homeassistant.helpers import discovery_flow
from homeassistant.helpers.event import async_track_time_interval
from .base_scanner import BaseHaScanner, BluetoothScannerDevice
from .const import (
@ -100,16 +98,12 @@ class BluetoothManager:
"""Manage Bluetooth."""
__slots__ = (
"hass",
"_integration_matcher",
"_cancel_unavailable_tracking",
"_cancel_logging_listener",
"_advertisement_tracker",
"_fallback_intervals",
"_intervals",
"_unavailable_callbacks",
"_connectable_unavailable_callbacks",
"_callback_index",
"_bleak_callbacks",
"_all_history",
"_connectable_history",
@ -122,21 +116,17 @@ class BluetoothManager:
"slot_manager",
"_debug",
"shutdown",
"_loop",
)
def __init__(
self,
hass: HomeAssistant,
integration_matcher: IntegrationMatcher,
bluetooth_adapters: BluetoothAdapters,
storage: BluetoothStorage,
slot_manager: BleakSlotManager,
) -> None:
"""Init bluetooth manager."""
self.hass = hass
self._integration_matcher = integration_matcher
self._cancel_unavailable_tracking: CALLBACK_TYPE | None = None
self._cancel_logging_listener: CALLBACK_TYPE | None = None
self._cancel_unavailable_tracking: asyncio.TimerHandle | None = None
self._advertisement_tracker = AdvertisementTracker()
self._fallback_intervals = self._advertisement_tracker.fallback_intervals
@ -149,7 +139,6 @@ class BluetoothManager:
str, list[Callable[[BluetoothServiceInfoBleak], None]]
] = {}
self._callback_index = BluetoothCallbackMatcherIndex()
self._bleak_callbacks: list[
tuple[AdvertisementDataCallback, dict[str, set[str]]]
] = []
@ -164,6 +153,7 @@ class BluetoothManager:
self.slot_manager = slot_manager
self._debug = _LOGGER.isEnabledFor(logging.DEBUG)
self.shutdown = False
self._loop: asyncio.AbstractEventLoop | None = None
@property
def supports_passive_scan(self) -> bool:
@ -206,7 +196,6 @@ class BluetoothManager:
return adapter
return None
@hass_callback
def async_scanner_by_source(self, source: str) -> BaseHaScanner | None:
"""Return the scanner for a source."""
return self._sources.get(source)
@ -229,45 +218,22 @@ class BluetoothManager:
self._adapters = self._bluetooth_adapters.adapters
return self._find_adapter_by_address(address)
@hass_callback
def _async_logging_changed(self, event: Event) -> None:
"""Handle logging change."""
self._debug = _LOGGER.isEnabledFor(logging.DEBUG)
async def async_setup(self) -> None:
"""Set up the bluetooth manager."""
self._loop = asyncio.get_running_loop()
await self._bluetooth_adapters.refresh()
install_multiple_bleak_catcher()
self._all_history, self._connectable_history = async_load_history_from_system(
self._bluetooth_adapters, self.storage
)
self._cancel_logging_listener = self.hass.bus.async_listen(
EVENT_LOGGING_CHANGED, self._async_logging_changed
)
self.async_setup_unavailable_tracking()
seen: set[str] = set()
for address, service_info in itertools.chain(
self._connectable_history.items(), self._all_history.items()
):
if address in seen:
continue
seen.add(address)
self._async_trigger_matching_discovery(service_info)
@hass_callback
def async_stop(self, event: Event) -> None:
def async_stop(self) -> None:
"""Stop the Bluetooth integration at shutdown."""
_LOGGER.debug("Stopping bluetooth manager")
self.shutdown = True
if self._cancel_unavailable_tracking:
self._cancel_unavailable_tracking()
self._cancel_unavailable_tracking.cancel()
self._cancel_unavailable_tracking = None
if self._cancel_logging_listener:
self._cancel_logging_listener()
self._cancel_logging_listener = None
uninstall_multiple_bleak_catcher()
@hass_callback
def async_scanner_devices_by_address(
self, address: str, connectable: bool
) -> list[BluetoothScannerDevice]:
@ -288,7 +254,6 @@ class BluetoothManager:
)
]
@hass_callback
def _async_all_discovered_addresses(self, connectable: bool) -> Iterable[str]:
"""Return all of discovered addresses.
@ -304,24 +269,25 @@ class BluetoothManager:
for scanner in self._non_connectable_scanners
)
@hass_callback
def async_discovered_devices(self, connectable: bool) -> list[BLEDevice]:
"""Return all of combined best path to discovered from all the scanners."""
histories = self._connectable_history if connectable else self._all_history
return [history.device for history in histories.values()]
@hass_callback
def async_setup_unavailable_tracking(self) -> None:
"""Set up the unavailable tracking."""
self._cancel_unavailable_tracking = async_track_time_interval(
self.hass,
self._async_check_unavailable,
timedelta(seconds=UNAVAILABLE_TRACK_SECONDS),
name="Bluetooth manager unavailable tracking",
self._schedule_unavailable_tracking()
def _schedule_unavailable_tracking(self) -> None:
"""Schedule the unavailable tracking."""
if TYPE_CHECKING:
assert self._loop is not None
loop = self._loop
self._cancel_unavailable_tracking = loop.call_at(
loop.time() + UNAVAILABLE_TRACK_SECONDS, self._async_check_unavailable
)
@hass_callback
def _async_check_unavailable(self, now: datetime) -> None:
def _async_check_unavailable(self) -> None:
"""Watch for unavailable devices and cleanup state history."""
monotonic_now = MONOTONIC_TIME()
connectable_history = self._connectable_history
@ -363,8 +329,7 @@ class BluetoothManager:
# available for both connectable and non-connectable
tracker.async_remove_fallback_interval(address)
tracker.async_remove_address(address)
self._integration_matcher.async_clear_address(address)
self._async_dismiss_discoveries(address)
self._address_disappeared(address)
service_info = history.pop(address)
@ -377,13 +342,13 @@ class BluetoothManager:
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in unavailable callback")
def _async_dismiss_discoveries(self, address: str) -> None:
"""Dismiss all discoveries for the given address."""
for flow in self.hass.config_entries.flow.async_progress_by_init_data_type(
BluetoothServiceInfoBleak,
lambda service_info: bool(service_info.address == address),
):
self.hass.config_entries.flow.async_abort(flow["flow_id"])
self._schedule_unavailable_tracking()
def _address_disappeared(self, address: str) -> None:
"""Call when an address disappears from the stack.
This method is intended to be overridden by subclasses.
"""
def _prefer_previous_adv_from_different_source(
self,
@ -436,7 +401,6 @@ class BluetoothManager:
return False
return True
@hass_callback
def scanner_adv_received(self, service_info: BluetoothServiceInfoBleak) -> None:
"""Handle a new advertisement from any scanner.
@ -567,16 +531,6 @@ class BluetoothManager:
time=service_info.time,
)
matched_domains = self._integration_matcher.match_domains(service_info)
if self._debug:
_LOGGER.debug(
"%s: %s %s match: %s",
self._async_describe_source(service_info),
address,
service_info.advertisement,
matched_domains,
)
if (connectable or old_connectable_service_info) and (
bleak_callbacks := self._bleak_callbacks
):
@ -586,22 +540,14 @@ class BluetoothManager:
for callback_filters in bleak_callbacks:
_dispatch_bleak_callback(*callback_filters, device, advertisement_data)
for match in self._callback_index.match_callbacks(service_info):
callback = match[CALLBACK]
try:
callback(service_info, BluetoothChange.ADVERTISEMENT)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in bluetooth callback")
self._discover_service_info(service_info)
for domain in matched_domains:
discovery_flow.async_create_flow(
self.hass,
domain,
{"source": config_entries.SOURCE_BLUETOOTH},
service_info,
)
def _discover_service_info(self, service_info: BluetoothServiceInfoBleak) -> None:
"""Discover a new service info.
This method is intended to be overridden by subclasses.
"""
@hass_callback
def _async_describe_source(self, service_info: BluetoothServiceInfoBleak) -> str:
"""Describe a source."""
if scanner := self._sources.get(service_info.source):
@ -612,7 +558,6 @@ class BluetoothManager:
description += " [connectable]"
return description
@hass_callback
def async_track_unavailable(
self,
callback: Callable[[BluetoothServiceInfoBleak], None],
@ -626,7 +571,6 @@ class BluetoothManager:
unavailable_callbacks = self._unavailable_callbacks
unavailable_callbacks.setdefault(address, []).append(callback)
@hass_callback
def _async_remove_callback() -> None:
unavailable_callbacks[address].remove(callback)
if not unavailable_callbacks[address]:
@ -634,50 +578,6 @@ class BluetoothManager:
return _async_remove_callback
@hass_callback
def async_register_callback(
self,
callback: BluetoothCallback,
matcher: BluetoothCallbackMatcher | None,
) -> Callable[[], None]:
"""Register a callback."""
callback_matcher = BluetoothCallbackMatcherWithCallback(callback=callback)
if not matcher:
callback_matcher[CONNECTABLE] = True
else:
# We could write out every item in the typed dict here
# but that would be a bit inefficient and verbose.
callback_matcher.update(matcher)
callback_matcher[CONNECTABLE] = matcher.get(CONNECTABLE, True)
connectable = callback_matcher[CONNECTABLE]
self._callback_index.add_callback_matcher(callback_matcher)
@hass_callback
def _async_remove_callback() -> None:
self._callback_index.remove_callback_matcher(callback_matcher)
# If we have history for the subscriber, we can trigger the callback
# immediately with the last packet so the subscriber can see the
# device.
history = self._connectable_history if connectable else self._all_history
service_infos: Iterable[BluetoothServiceInfoBleak] = []
if address := callback_matcher.get(ADDRESS):
if service_info := history.get(address):
service_infos = [service_info]
else:
service_infos = history.values()
for service_info in service_infos:
if ble_device_matches(callback_matcher, service_info):
try:
callback(service_info, BluetoothChange.ADVERTISEMENT)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in bluetooth callback")
return _async_remove_callback
@hass_callback
def async_ble_device_from_address(
self, address: str, connectable: bool
) -> BLEDevice | None:
@ -687,13 +587,11 @@ class BluetoothManager:
return history.device
return None
@hass_callback
def async_address_present(self, address: str, connectable: bool) -> bool:
"""Return if the address is present."""
histories = self._connectable_history if connectable else self._all_history
return address in histories
@hass_callback
def async_discovered_service_info(
self, connectable: bool
) -> Iterable[BluetoothServiceInfoBleak]:
@ -701,7 +599,6 @@ class BluetoothManager:
histories = self._connectable_history if connectable else self._all_history
return histories.values()
@hass_callback
def async_last_service_info(
self, address: str, connectable: bool
) -> BluetoothServiceInfoBleak | None:
@ -709,28 +606,6 @@ class BluetoothManager:
histories = self._connectable_history if connectable else self._all_history
return histories.get(address)
def _async_trigger_matching_discovery(
self, service_info: BluetoothServiceInfoBleak
) -> None:
"""Trigger discovery for matching domains."""
for domain in self._integration_matcher.match_domains(service_info):
discovery_flow.async_create_flow(
self.hass,
domain,
{"source": config_entries.SOURCE_BLUETOOTH},
service_info,
)
@hass_callback
def async_rediscover_address(self, address: str) -> None:
"""Trigger discovery of devices which have already been seen."""
self._integration_matcher.async_clear_address(address)
if service_info := self._connectable_history.get(address):
self._async_trigger_matching_discovery(service_info)
return
if service_info := self._all_history.get(address):
self._async_trigger_matching_discovery(service_info)
def async_register_scanner(
self,
scanner: BaseHaScanner,
@ -758,7 +633,6 @@ class BluetoothManager:
self.slot_manager.register_adapter(scanner.adapter, connection_slots)
return _unregister_scanner
@hass_callback
def async_register_bleak_callback(
self, callback: AdvertisementDataCallback, filters: dict[str, set[str]]
) -> CALLBACK_TYPE:
@ -766,7 +640,6 @@ class BluetoothManager:
callback_entry = (callback, filters)
self._bleak_callbacks.append(callback_entry)
@hass_callback
def _remove_callback() -> None:
self._bleak_callbacks.remove(callback_entry)
@ -780,29 +653,180 @@ class BluetoothManager:
return _remove_callback
@hass_callback
def async_release_connection_slot(self, device: BLEDevice) -> None:
"""Release a connection slot."""
self.slot_manager.release_slot(device)
@hass_callback
def async_allocate_connection_slot(self, device: BLEDevice) -> bool:
"""Allocate a connection slot."""
return self.slot_manager.allocate_slot(device)
@hass_callback
def async_get_learned_advertising_interval(self, address: str) -> float | None:
"""Get the learned advertising interval for a MAC address."""
return self._intervals.get(address)
@hass_callback
def async_get_fallback_availability_interval(self, address: str) -> float | None:
"""Get the fallback availability timeout for a MAC address."""
return self._fallback_intervals.get(address)
@hass_callback
def async_set_fallback_availability_interval(
self, address: str, interval: float
) -> None:
"""Override the fallback availability timeout for a MAC address."""
self._fallback_intervals[address] = interval
class HomeAssistantBluetoothManager(BluetoothManager):
"""Manage Bluetooth for Home Assistant."""
__slots__ = (
"hass",
"_integration_matcher",
"_callback_index",
"_cancel_logging_listener",
)
def __init__(
self,
hass: HomeAssistant,
integration_matcher: IntegrationMatcher,
bluetooth_adapters: BluetoothAdapters,
storage: BluetoothStorage,
slot_manager: BleakSlotManager,
) -> None:
"""Init bluetooth manager."""
self.hass = hass
self._integration_matcher = integration_matcher
self._callback_index = BluetoothCallbackMatcherIndex()
self._cancel_logging_listener: CALLBACK_TYPE | None = None
super().__init__(bluetooth_adapters, storage, slot_manager)
@hass_callback
def _async_logging_changed(self, event: Event) -> None:
"""Handle logging change."""
self._debug = _LOGGER.isEnabledFor(logging.DEBUG)
def _async_trigger_matching_discovery(
self, service_info: BluetoothServiceInfoBleak
) -> None:
"""Trigger discovery for matching domains."""
for domain in self._integration_matcher.match_domains(service_info):
discovery_flow.async_create_flow(
self.hass,
domain,
{"source": config_entries.SOURCE_BLUETOOTH},
service_info,
)
@hass_callback
def async_rediscover_address(self, address: str) -> None:
"""Trigger discovery of devices which have already been seen."""
self._integration_matcher.async_clear_address(address)
if service_info := self._connectable_history.get(address):
self._async_trigger_matching_discovery(service_info)
return
if service_info := self._all_history.get(address):
self._async_trigger_matching_discovery(service_info)
def _discover_service_info(self, service_info: BluetoothServiceInfoBleak) -> None:
matched_domains = self._integration_matcher.match_domains(service_info)
if self._debug:
_LOGGER.debug(
"%s: %s %s match: %s",
self._async_describe_source(service_info),
service_info.address,
service_info.advertisement,
matched_domains,
)
for match in self._callback_index.match_callbacks(service_info):
callback = match[CALLBACK]
try:
callback(service_info, BluetoothChange.ADVERTISEMENT)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in bluetooth callback")
for domain in matched_domains:
discovery_flow.async_create_flow(
self.hass,
domain,
{"source": config_entries.SOURCE_BLUETOOTH},
service_info,
)
def _address_disappeared(self, address: str) -> None:
"""Dismiss all discoveries for the given address."""
self._integration_matcher.async_clear_address(address)
for flow in self.hass.config_entries.flow.async_progress_by_init_data_type(
BluetoothServiceInfoBleak,
lambda service_info: bool(service_info.address == address),
):
self.hass.config_entries.flow.async_abort(flow["flow_id"])
async def async_setup(self) -> None:
"""Set up the bluetooth manager."""
await super().async_setup()
self._all_history, self._connectable_history = async_load_history_from_system(
self._bluetooth_adapters, self.storage
)
self._cancel_logging_listener = self.hass.bus.async_listen(
EVENT_LOGGING_CHANGED, self._async_logging_changed
)
seen: set[str] = set()
for address, service_info in itertools.chain(
self._connectable_history.items(), self._all_history.items()
):
if address in seen:
continue
seen.add(address)
self._async_trigger_matching_discovery(service_info)
def async_register_callback(
self,
callback: BluetoothCallback,
matcher: BluetoothCallbackMatcher | None,
) -> Callable[[], None]:
"""Register a callback."""
callback_matcher = BluetoothCallbackMatcherWithCallback(callback=callback)
if not matcher:
callback_matcher[CONNECTABLE] = True
else:
# We could write out every item in the typed dict here
# but that would be a bit inefficient and verbose.
callback_matcher.update(matcher)
callback_matcher[CONNECTABLE] = matcher.get(CONNECTABLE, True)
connectable = callback_matcher[CONNECTABLE]
self._callback_index.add_callback_matcher(callback_matcher)
def _async_remove_callback() -> None:
self._callback_index.remove_callback_matcher(callback_matcher)
# If we have history for the subscriber, we can trigger the callback
# immediately with the last packet so the subscriber can see the
# device.
history = self._connectable_history if connectable else self._all_history
service_infos: Iterable[BluetoothServiceInfoBleak] = []
if address := callback_matcher.get(ADDRESS):
if service_info := history.get(address):
service_infos = [service_info]
else:
service_infos = history.values()
for service_info in service_infos:
if ble_device_matches(callback_matcher, service_info):
try:
callback(service_info, BluetoothChange.ADVERTISEMENT)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Error in bluetooth callback")
return _async_remove_callback
@hass_callback
def async_stop(self) -> None:
"""Stop the Bluetooth integration at shutdown."""
_LOGGER.debug("Stopping bluetooth manager")
super().async_stop()
if self._cancel_logging_listener:
self._cancel_logging_listener()
self._cancel_logging_listener = None

View File

@ -283,7 +283,6 @@ class HaBleakClientWrapper(BleakClient):
self.__disconnected_callback
),
timeout=self.__timeout,
hass=manager.hass,
)
if debug_logging:
# Only lookup the description if we are going to log it