mirror of
https://github.com/home-assistant/core.git
synced 2025-11-04 16:39:28 +00:00
423 lines
16 KiB
Python
423 lines
16 KiB
Python
"""The bluetooth integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable, Iterable
|
|
from functools import partial
|
|
import itertools
|
|
import logging
|
|
|
|
from bleak_retry_connector import BleakSlotManager
|
|
from bluetooth_adapters import (
|
|
ADAPTER_TYPE,
|
|
BluetoothAdapters,
|
|
adapter_human_name,
|
|
adapter_model,
|
|
)
|
|
from habluetooth import (
|
|
BaseHaRemoteScanner,
|
|
BaseHaScanner,
|
|
BluetoothManager,
|
|
BluetoothScanningMode,
|
|
HaScanner,
|
|
)
|
|
|
|
from homeassistant import config_entries
|
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_LOGGING_CHANGED
|
|
from homeassistant.core import (
|
|
CALLBACK_TYPE,
|
|
Event,
|
|
HomeAssistant,
|
|
callback as hass_callback,
|
|
)
|
|
from homeassistant.helpers import discovery_flow, issue_registry as ir
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
from homeassistant.util.package import is_docker_env
|
|
|
|
from .const import (
|
|
CONF_SOURCE,
|
|
CONF_SOURCE_CONFIG_ENTRY_ID,
|
|
CONF_SOURCE_DEVICE_ID,
|
|
CONF_SOURCE_DOMAIN,
|
|
CONF_SOURCE_MODEL,
|
|
DOMAIN,
|
|
)
|
|
from .match import (
|
|
ADDRESS,
|
|
CALLBACK,
|
|
CONNECTABLE,
|
|
BluetoothCallbackMatcher,
|
|
BluetoothCallbackMatcherIndex,
|
|
BluetoothCallbackMatcherWithCallback,
|
|
IntegrationMatcher,
|
|
ble_device_matches,
|
|
)
|
|
from .models import BluetoothCallback, BluetoothChange, BluetoothServiceInfoBleak
|
|
from .storage import BluetoothStorage
|
|
from .util import async_load_history_from_system
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class HomeAssistantBluetoothManager(BluetoothManager):
|
|
"""Manage Bluetooth for Home Assistant."""
|
|
|
|
__slots__ = (
|
|
"_callback_index",
|
|
"_cancel_logging_listener",
|
|
"_integration_matcher",
|
|
"hass",
|
|
"storage",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
integration_matcher: IntegrationMatcher,
|
|
bluetooth_adapters: BluetoothAdapters,
|
|
storage: BluetoothStorage,
|
|
slot_manager: BleakSlotManager,
|
|
) -> None:
|
|
"""Init bluetooth manager."""
|
|
self.hass = hass
|
|
self.storage = storage
|
|
self._integration_matcher = integration_matcher
|
|
self._callback_index = BluetoothCallbackMatcherIndex()
|
|
self._cancel_logging_listener: CALLBACK_TYPE | None = None
|
|
super().__init__(bluetooth_adapters, slot_manager)
|
|
self._async_logging_changed()
|
|
|
|
@hass_callback
|
|
def _async_logging_changed(self, event: Event | None = None) -> None:
|
|
"""Handle logging change."""
|
|
self._debug = _LOGGER.isEnabledFor(logging.DEBUG)
|
|
|
|
def _async_trigger_matching_discovery(
|
|
self, service_info: BluetoothServiceInfoBleak
|
|
) -> None:
|
|
"""Trigger discovery for matching domains."""
|
|
discovery_key = discovery_flow.DiscoveryKey(
|
|
domain=DOMAIN,
|
|
key=service_info.address,
|
|
version=1,
|
|
)
|
|
for domain in self._integration_matcher.match_domains(service_info):
|
|
discovery_flow.async_create_flow(
|
|
self.hass,
|
|
domain,
|
|
{"source": config_entries.SOURCE_BLUETOOTH},
|
|
service_info,
|
|
discovery_key=discovery_key,
|
|
)
|
|
|
|
@hass_callback
|
|
def async_rediscover_address(self, address: str) -> None:
|
|
"""Trigger discovery of devices which have already been seen."""
|
|
self._integration_matcher.async_clear_address(address)
|
|
if service_info := self._connectable_history.get(address):
|
|
self._async_trigger_matching_discovery(service_info)
|
|
return
|
|
if service_info := self._all_history.get(address):
|
|
self._async_trigger_matching_discovery(service_info)
|
|
|
|
def _discover_service_info(self, service_info: BluetoothServiceInfoBleak) -> None:
|
|
matched_domains = self._integration_matcher.match_domains(service_info)
|
|
if self._debug:
|
|
_LOGGER.debug(
|
|
"%s: %s match: %s",
|
|
self._async_describe_source(service_info),
|
|
service_info,
|
|
matched_domains,
|
|
)
|
|
|
|
for match in self._callback_index.match_callbacks(service_info):
|
|
callback = match[CALLBACK]
|
|
try:
|
|
callback(service_info, BluetoothChange.ADVERTISEMENT)
|
|
except Exception:
|
|
_LOGGER.exception("Error in bluetooth callback")
|
|
|
|
if not matched_domains:
|
|
return # avoid creating DiscoveryKey if there are no matches
|
|
|
|
discovery_key = discovery_flow.DiscoveryKey(
|
|
domain=DOMAIN,
|
|
key=service_info.address,
|
|
version=1,
|
|
)
|
|
for domain in matched_domains:
|
|
discovery_flow.async_create_flow(
|
|
self.hass,
|
|
domain,
|
|
{"source": config_entries.SOURCE_BLUETOOTH},
|
|
service_info,
|
|
discovery_key=discovery_key,
|
|
)
|
|
|
|
def _address_disappeared(self, address: str) -> None:
|
|
"""Dismiss all discoveries for the given address."""
|
|
self._integration_matcher.async_clear_address(address)
|
|
for flow in self.hass.config_entries.flow.async_progress_by_init_data_type(
|
|
BluetoothServiceInfoBleak,
|
|
lambda service_info: bool(service_info.address == address),
|
|
):
|
|
self.hass.config_entries.flow.async_abort(flow["flow_id"])
|
|
|
|
async def async_setup(self) -> None:
|
|
"""Set up the bluetooth manager."""
|
|
await super().async_setup()
|
|
self._all_history, self._connectable_history = async_load_history_from_system(
|
|
self._bluetooth_adapters, self.storage
|
|
)
|
|
self._cancel_logging_listener = self.hass.bus.async_listen(
|
|
EVENT_LOGGING_CHANGED, self._async_logging_changed
|
|
)
|
|
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
|
|
seen: set[str] = set()
|
|
for address, service_info in itertools.chain(
|
|
self._connectable_history.items(), self._all_history.items()
|
|
):
|
|
if address in seen:
|
|
continue
|
|
seen.add(address)
|
|
self._async_trigger_matching_discovery(service_info)
|
|
async_dispatcher_connect(
|
|
self.hass,
|
|
config_entries.signal_discovered_config_entry_removed(DOMAIN),
|
|
self._handle_config_entry_removed,
|
|
)
|
|
|
|
def async_register_callback(
|
|
self,
|
|
callback: BluetoothCallback,
|
|
matcher: BluetoothCallbackMatcher | None,
|
|
) -> Callable[[], None]:
|
|
"""Register a callback."""
|
|
callback_matcher = BluetoothCallbackMatcherWithCallback(callback=callback)
|
|
if not matcher:
|
|
callback_matcher[CONNECTABLE] = True
|
|
else:
|
|
# We could write out every item in the typed dict here
|
|
# but that would be a bit inefficient and verbose.
|
|
callback_matcher.update(matcher)
|
|
callback_matcher[CONNECTABLE] = matcher.get(CONNECTABLE, True)
|
|
|
|
connectable = callback_matcher[CONNECTABLE]
|
|
self._callback_index.add_callback_matcher(callback_matcher)
|
|
|
|
def _async_remove_callback() -> None:
|
|
self._callback_index.remove_callback_matcher(callback_matcher)
|
|
|
|
# If we have history for the subscriber, we can trigger the callback
|
|
# immediately with the last packet so the subscriber can see the
|
|
# device.
|
|
history = self._connectable_history if connectable else self._all_history
|
|
service_infos: Iterable[BluetoothServiceInfoBleak] = []
|
|
if address := callback_matcher.get(ADDRESS):
|
|
if service_info := history.get(address):
|
|
service_infos = [service_info]
|
|
else:
|
|
service_infos = history.values()
|
|
|
|
for service_info in service_infos:
|
|
if ble_device_matches(callback_matcher, service_info):
|
|
try:
|
|
callback(service_info, BluetoothChange.ADVERTISEMENT)
|
|
except Exception:
|
|
_LOGGER.exception("Error in bluetooth callback")
|
|
|
|
return _async_remove_callback
|
|
|
|
@hass_callback
|
|
def async_stop(self, event: Event | None = None) -> None:
|
|
"""Stop the Bluetooth integration at shutdown."""
|
|
_LOGGER.debug("Stopping bluetooth manager")
|
|
self._async_save_scanner_histories()
|
|
super().async_stop()
|
|
if self._cancel_logging_listener:
|
|
self._cancel_logging_listener()
|
|
self._cancel_logging_listener = None
|
|
|
|
def _async_save_scanner_histories(self) -> None:
|
|
"""Save the scanner histories."""
|
|
for scanner in itertools.chain(
|
|
self._connectable_scanners, self._non_connectable_scanners
|
|
):
|
|
self._async_save_scanner_history(scanner)
|
|
|
|
def _async_save_scanner_history(self, scanner: BaseHaScanner) -> None:
|
|
"""Save the scanner history."""
|
|
self.storage.async_set_advertisement_history(
|
|
scanner.source, scanner.serialize_discovered_devices()
|
|
)
|
|
|
|
def _async_unregister_scanner(
|
|
self, scanner: BaseHaScanner, unregister: CALLBACK_TYPE
|
|
) -> None:
|
|
"""Unregister a scanner."""
|
|
unregister()
|
|
self._async_save_scanner_history(scanner)
|
|
|
|
@hass_callback
|
|
def async_register_hass_scanner(
|
|
self,
|
|
scanner: BaseHaScanner,
|
|
connection_slots: int | None = None,
|
|
source_domain: str | None = None,
|
|
source_model: str | None = None,
|
|
source_config_entry_id: str | None = None,
|
|
source_device_id: str | None = None,
|
|
) -> CALLBACK_TYPE:
|
|
"""Register a scanner."""
|
|
cancel = self.async_register_scanner(scanner, connection_slots)
|
|
if (
|
|
isinstance(scanner, BaseHaRemoteScanner)
|
|
and source_domain
|
|
and source_config_entry_id
|
|
):
|
|
self.hass.async_create_task(
|
|
self.hass.config_entries.flow.async_init(
|
|
DOMAIN,
|
|
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
|
|
data={
|
|
CONF_SOURCE: scanner.source,
|
|
CONF_SOURCE_DOMAIN: source_domain,
|
|
CONF_SOURCE_MODEL: source_model,
|
|
CONF_SOURCE_CONFIG_ENTRY_ID: source_config_entry_id,
|
|
CONF_SOURCE_DEVICE_ID: source_device_id,
|
|
},
|
|
)
|
|
)
|
|
return cancel
|
|
|
|
def async_register_scanner(
|
|
self,
|
|
scanner: BaseHaScanner,
|
|
connection_slots: int | None = None,
|
|
) -> CALLBACK_TYPE:
|
|
"""Register a scanner."""
|
|
if history := self.storage.async_get_advertisement_history(scanner.source):
|
|
scanner.restore_discovered_devices(history)
|
|
|
|
unregister = super().async_register_scanner(scanner, connection_slots)
|
|
return partial(self._async_unregister_scanner, scanner, unregister)
|
|
|
|
@hass_callback
|
|
def async_remove_scanner(self, source: str) -> None:
|
|
"""Remove a scanner."""
|
|
self.storage.async_remove_advertisement_history(source)
|
|
if entry := self.hass.config_entries.async_entry_for_domain_unique_id(
|
|
DOMAIN, source
|
|
):
|
|
self.hass.async_create_task(
|
|
self.hass.config_entries.async_remove(entry.entry_id),
|
|
f"Removing {source} Bluetooth config entry",
|
|
)
|
|
|
|
@hass_callback
|
|
def _handle_config_entry_removed(
|
|
self,
|
|
entry: config_entries.ConfigEntry,
|
|
) -> None:
|
|
"""Handle config entry changes."""
|
|
for discovery_key in entry.discovery_keys[DOMAIN]:
|
|
if discovery_key.version != 1 or not isinstance(discovery_key.key, str):
|
|
continue
|
|
address = discovery_key.key
|
|
_LOGGER.debug("Rediscover address %s", address)
|
|
self.async_rediscover_address(address)
|
|
|
|
def on_scanner_start(self, scanner: BaseHaScanner) -> None:
|
|
"""Handle when a scanner starts.
|
|
|
|
Create or delete repair issues for local adapters based on degraded mode.
|
|
"""
|
|
super().on_scanner_start(scanner)
|
|
|
|
# Only handle repair issues for local adapters (HaScanner instances)
|
|
if not isinstance(scanner, HaScanner):
|
|
return
|
|
self.async_check_degraded_mode(scanner)
|
|
self.async_check_scanning_mode(scanner)
|
|
|
|
@hass_callback
|
|
def async_check_scanning_mode(self, scanner: HaScanner) -> None:
|
|
"""Check if the scanner is running in passive mode when active mode is requested."""
|
|
passive_mode_issue_id = f"bluetooth_adapter_passive_mode_{scanner.source}"
|
|
|
|
# Check if scanner is NOT in passive mode when active mode was requested
|
|
if not (
|
|
scanner.requested_mode is BluetoothScanningMode.ACTIVE
|
|
and scanner.current_mode is BluetoothScanningMode.PASSIVE
|
|
):
|
|
# Delete passive mode issue if it exists and we're not in passive fallback
|
|
ir.async_delete_issue(self.hass, DOMAIN, passive_mode_issue_id)
|
|
return
|
|
|
|
# Create repair issue for passive mode fallback
|
|
adapter_name = adapter_human_name(
|
|
scanner.adapter, scanner.mac_address or "00:00:00:00:00:00"
|
|
)
|
|
adapter_details = self._bluetooth_adapters.adapters.get(scanner.adapter)
|
|
model = adapter_model(adapter_details) if adapter_details else None
|
|
|
|
# Determine adapter type for specific instructions
|
|
# Default to USB for any other type or unknown
|
|
if adapter_details and adapter_details.get(ADAPTER_TYPE) == "uart":
|
|
translation_key = "bluetooth_adapter_passive_mode_uart"
|
|
else:
|
|
translation_key = "bluetooth_adapter_passive_mode_usb"
|
|
|
|
ir.async_create_issue(
|
|
self.hass,
|
|
DOMAIN,
|
|
passive_mode_issue_id,
|
|
is_fixable=False, # Requires a reboot or unplug
|
|
severity=ir.IssueSeverity.WARNING,
|
|
translation_key=translation_key,
|
|
translation_placeholders={
|
|
"adapter": adapter_name,
|
|
"model": model or "Unknown",
|
|
},
|
|
)
|
|
|
|
@hass_callback
|
|
def async_check_degraded_mode(self, scanner: HaScanner) -> None:
|
|
"""Check if we are in degraded mode and create/delete repair issues."""
|
|
issue_id = f"bluetooth_adapter_missing_permissions_{scanner.source}"
|
|
|
|
# Delete any existing issue if not in degraded mode
|
|
if not self.is_operating_degraded():
|
|
ir.async_delete_issue(self.hass, DOMAIN, issue_id)
|
|
return
|
|
|
|
# Only create repair issues for Docker-based installations where users
|
|
# can fix permissions. This includes: Home Assistant Supervised,
|
|
# Home Assistant Container, and third-party containers
|
|
if not is_docker_env():
|
|
return
|
|
|
|
# Create repair issue for degraded mode in Docker (including Supervised)
|
|
adapter_name = adapter_human_name(
|
|
scanner.adapter, scanner.mac_address or "00:00:00:00:00:00"
|
|
)
|
|
|
|
# Try to get adapter details from the bluetooth adapters
|
|
adapter_details = self._bluetooth_adapters.adapters.get(scanner.adapter)
|
|
model = adapter_model(adapter_details) if adapter_details else None
|
|
|
|
ir.async_create_issue(
|
|
self.hass,
|
|
DOMAIN,
|
|
issue_id,
|
|
is_fixable=False, # Not fixable from within HA - requires
|
|
# container restart with new permissions
|
|
severity=ir.IssueSeverity.WARNING,
|
|
translation_key="bluetooth_adapter_missing_permissions",
|
|
translation_placeholders={
|
|
"adapter": adapter_name,
|
|
"model": model or "Unknown",
|
|
"docs_url": "https://www.home-assistant.io/integrations/bluetooth/#additional-details-for-container",
|
|
},
|
|
)
|