mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 16:57:53 +00:00
Add bluetooth integration (#74653)
Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
parent
c27fbce7d0
commit
a697672944
@ -57,6 +57,7 @@ homeassistant.components.automation.*
|
||||
homeassistant.components.backup.*
|
||||
homeassistant.components.baf.*
|
||||
homeassistant.components.binary_sensor.*
|
||||
homeassistant.components.bluetooth.*
|
||||
homeassistant.components.bluetooth_tracker.*
|
||||
homeassistant.components.bmw_connected_drive.*
|
||||
homeassistant.components.bond.*
|
||||
|
@ -138,6 +138,8 @@ build.json @home-assistant/supervisor
|
||||
/homeassistant/components/blueprint/ @home-assistant/core
|
||||
/tests/components/blueprint/ @home-assistant/core
|
||||
/homeassistant/components/bluesound/ @thrawnarn
|
||||
/homeassistant/components/bluetooth/ @bdraco
|
||||
/tests/components/bluetooth/ @bdraco
|
||||
/homeassistant/components/bmw_connected_drive/ @gerard33 @rikroe
|
||||
/tests/components/bmw_connected_drive/ @gerard33 @rikroe
|
||||
/homeassistant/components/bond/ @bdraco @prystupa @joshs85 @marciogranzotto
|
||||
|
@ -70,7 +70,7 @@ LOGGING_INTEGRATIONS = {
|
||||
# To record data
|
||||
"recorder",
|
||||
}
|
||||
DISCOVERY_INTEGRATIONS = ("dhcp", "ssdp", "usb", "zeroconf")
|
||||
DISCOVERY_INTEGRATIONS = ("bluetooth", "dhcp", "ssdp", "usb", "zeroconf")
|
||||
STAGE_1_INTEGRATIONS = {
|
||||
# We need to make sure discovery integrations
|
||||
# update their deps before stage 2 integrations
|
||||
|
297
homeassistant/components/bluetooth/__init__.py
Normal file
297
homeassistant/components/bluetooth/__init__.py
Normal file
@ -0,0 +1,297 @@
|
||||
"""The bluetooth integration."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import dataclasses
|
||||
from enum import Enum
|
||||
import fnmatch
|
||||
from functools import cached_property
|
||||
import logging
|
||||
import platform
|
||||
from typing import Final
|
||||
|
||||
from bleak import BleakError
|
||||
from bleak.backends.device import MANUFACTURERS, BLEDevice
|
||||
from bleak.backends.scanner import AdvertisementData
|
||||
from lru import LRU # pylint: disable=no-name-in-module
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.core import (
|
||||
CALLBACK_TYPE,
|
||||
Event,
|
||||
HomeAssistant,
|
||||
callback as hass_callback,
|
||||
)
|
||||
from homeassistant.data_entry_flow import BaseServiceInfo
|
||||
from homeassistant.helpers import discovery_flow
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.loader import BluetoothMatcher, async_get_bluetooth
|
||||
|
||||
from . import models
|
||||
from .const import DOMAIN
|
||||
from .models import HaBleakScanner
|
||||
from .usage import install_multiple_bleak_catcher
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
MAX_REMEMBER_ADDRESSES: Final = 2048
|
||||
|
||||
|
||||
class BluetoothScanningMode(Enum):
|
||||
"""The mode of scanning for bluetooth devices."""
|
||||
|
||||
PASSIVE = "passive"
|
||||
ACTIVE = "active"
|
||||
|
||||
|
||||
SCANNING_MODE_TO_BLEAK = {
|
||||
BluetoothScanningMode.ACTIVE: "active",
|
||||
BluetoothScanningMode.PASSIVE: "passive",
|
||||
}
|
||||
|
||||
LOCAL_NAME: Final = "local_name"
|
||||
SERVICE_UUID: Final = "service_uuid"
|
||||
MANUFACTURER_ID: Final = "manufacturer_id"
|
||||
MANUFACTURER_DATA_FIRST_BYTE: Final = "manufacturer_data_first_byte"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class BluetoothServiceInfo(BaseServiceInfo):
|
||||
"""Prepared info from bluetooth entries."""
|
||||
|
||||
name: str
|
||||
address: str
|
||||
rssi: int
|
||||
manufacturer_data: dict[int, bytes]
|
||||
service_data: dict[str, bytes]
|
||||
service_uuids: list[str]
|
||||
|
||||
@classmethod
|
||||
def from_advertisement(
|
||||
cls, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> BluetoothServiceInfo:
|
||||
"""Create a BluetoothServiceInfo from an advertisement."""
|
||||
return cls(
|
||||
name=advertisement_data.local_name or device.name or device.address,
|
||||
address=device.address,
|
||||
rssi=device.rssi,
|
||||
manufacturer_data=advertisement_data.manufacturer_data,
|
||||
service_data=advertisement_data.service_data,
|
||||
service_uuids=advertisement_data.service_uuids,
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def manufacturer(self) -> str | None:
|
||||
"""Convert manufacturer data to a string."""
|
||||
for manufacturer in self.manufacturer_data:
|
||||
if manufacturer in MANUFACTURERS:
|
||||
name: str = MANUFACTURERS[manufacturer]
|
||||
return name
|
||||
return None
|
||||
|
||||
@cached_property
|
||||
def manufacturer_id(self) -> int | None:
|
||||
"""Get the first manufacturer id."""
|
||||
for manufacturer in self.manufacturer_data:
|
||||
return manufacturer
|
||||
return None
|
||||
|
||||
|
||||
BluetoothChange = Enum("BluetoothChange", "ADVERTISEMENT")
|
||||
BluetoothCallback = Callable[[BluetoothServiceInfo, BluetoothChange], None]
|
||||
|
||||
|
||||
@hass_callback
|
||||
def async_register_callback(
|
||||
hass: HomeAssistant,
|
||||
callback: BluetoothCallback,
|
||||
match_dict: BluetoothMatcher | None,
|
||||
) -> Callable[[], None]:
|
||||
"""Register to receive a callback on bluetooth change.
|
||||
|
||||
Returns a callback that can be used to cancel the registration.
|
||||
"""
|
||||
manager: BluetoothManager = hass.data[DOMAIN]
|
||||
return manager.async_register_callback(callback, match_dict)
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the bluetooth integration."""
|
||||
integration_matchers = await async_get_bluetooth(hass)
|
||||
bluetooth_discovery = BluetoothManager(
|
||||
hass, integration_matchers, BluetoothScanningMode.PASSIVE
|
||||
)
|
||||
await bluetooth_discovery.async_setup()
|
||||
hass.data[DOMAIN] = bluetooth_discovery
|
||||
return True
|
||||
|
||||
|
||||
def _ble_device_matches(
|
||||
matcher: BluetoothMatcher, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> bool:
|
||||
"""Check if a ble device and advertisement_data matches the matcher."""
|
||||
if (
|
||||
matcher_local_name := matcher.get(LOCAL_NAME)
|
||||
) is not None and not fnmatch.fnmatch(
|
||||
advertisement_data.local_name or device.name or device.address,
|
||||
matcher_local_name,
|
||||
):
|
||||
return False
|
||||
|
||||
if (
|
||||
matcher_service_uuid := matcher.get(SERVICE_UUID)
|
||||
) is not None and matcher_service_uuid not in advertisement_data.service_uuids:
|
||||
return False
|
||||
|
||||
if (
|
||||
(matcher_manfacturer_id := matcher.get(MANUFACTURER_ID)) is not None
|
||||
and matcher_manfacturer_id not in advertisement_data.manufacturer_data
|
||||
):
|
||||
return False
|
||||
|
||||
if (
|
||||
matcher_manufacturer_data_first_byte := matcher.get(
|
||||
MANUFACTURER_DATA_FIRST_BYTE
|
||||
)
|
||||
) is not None and not any(
|
||||
matcher_manufacturer_data_first_byte == manufacturer_data[0]
|
||||
for manufacturer_data in advertisement_data.manufacturer_data.values()
|
||||
):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@hass_callback
|
||||
def async_enable_rssi_updates() -> None:
|
||||
"""Bleak filters out RSSI updates by default on linux only."""
|
||||
# We want RSSI updates
|
||||
if platform.system() == "Linux":
|
||||
from bleak.backends.bluezdbus import ( # pylint: disable=import-outside-toplevel
|
||||
scanner,
|
||||
)
|
||||
|
||||
scanner._ADVERTISING_DATA_PROPERTIES.add( # pylint: disable=protected-access
|
||||
"RSSI"
|
||||
)
|
||||
|
||||
|
||||
class BluetoothManager:
|
||||
"""Manage Bluetooth."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
integration_matchers: list[BluetoothMatcher],
|
||||
scanning_mode: BluetoothScanningMode,
|
||||
) -> None:
|
||||
"""Init bluetooth discovery."""
|
||||
self.hass = hass
|
||||
self.scanning_mode = scanning_mode
|
||||
self._integration_matchers = integration_matchers
|
||||
self.scanner: HaBleakScanner | None = None
|
||||
self._cancel_device_detected: CALLBACK_TYPE | None = None
|
||||
self._callbacks: list[tuple[BluetoothCallback, BluetoothMatcher | None]] = []
|
||||
# Some devices use a random address so we need to use
|
||||
# an LRU to avoid memory issues.
|
||||
self._matched: LRU = LRU(MAX_REMEMBER_ADDRESSES)
|
||||
|
||||
async def async_setup(self) -> None:
|
||||
"""Set up BT Discovery."""
|
||||
try:
|
||||
self.scanner = HaBleakScanner(
|
||||
scanning_mode=SCANNING_MODE_TO_BLEAK[self.scanning_mode]
|
||||
)
|
||||
except (FileNotFoundError, BleakError) as ex:
|
||||
_LOGGER.warning(
|
||||
"Could not create bluetooth scanner (is bluetooth present and enabled?): %s",
|
||||
ex,
|
||||
)
|
||||
return
|
||||
async_enable_rssi_updates()
|
||||
install_multiple_bleak_catcher(self.scanner)
|
||||
# We have to start it right away as some integrations might
|
||||
# need it straight away.
|
||||
_LOGGER.debug("Starting bluetooth scanner")
|
||||
self.scanner.register_detection_callback(self.scanner.async_callback_dispatcher)
|
||||
self._cancel_device_detected = self.scanner.async_register_callback(
|
||||
self._device_detected, {}
|
||||
)
|
||||
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.async_stop)
|
||||
await self.scanner.start()
|
||||
|
||||
@hass_callback
|
||||
def _device_detected(
|
||||
self, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Handle a detected device."""
|
||||
matched_domains: set[str] | None = None
|
||||
if device.address not in self._matched:
|
||||
matched_domains = {
|
||||
matcher["domain"]
|
||||
for matcher in self._integration_matchers
|
||||
if _ble_device_matches(matcher, device, advertisement_data)
|
||||
}
|
||||
if matched_domains:
|
||||
self._matched[device.address] = True
|
||||
_LOGGER.debug(
|
||||
"Device detected: %s with advertisement_data: %s matched domains: %s",
|
||||
device,
|
||||
advertisement_data,
|
||||
matched_domains,
|
||||
)
|
||||
|
||||
if not matched_domains and not self._callbacks:
|
||||
return
|
||||
|
||||
service_info: BluetoothServiceInfo | None = None
|
||||
for callback, matcher in self._callbacks:
|
||||
if matcher is None or _ble_device_matches(
|
||||
matcher, device, advertisement_data
|
||||
):
|
||||
if service_info is None:
|
||||
service_info = BluetoothServiceInfo.from_advertisement(
|
||||
device, advertisement_data
|
||||
)
|
||||
try:
|
||||
callback(service_info, BluetoothChange.ADVERTISEMENT)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Error in bluetooth callback")
|
||||
|
||||
if not matched_domains:
|
||||
return
|
||||
if service_info is None:
|
||||
service_info = BluetoothServiceInfo.from_advertisement(
|
||||
device, advertisement_data
|
||||
)
|
||||
for domain in matched_domains:
|
||||
discovery_flow.async_create_flow(
|
||||
self.hass,
|
||||
domain,
|
||||
{"source": config_entries.SOURCE_BLUETOOTH},
|
||||
service_info,
|
||||
)
|
||||
|
||||
@hass_callback
|
||||
def async_register_callback(
|
||||
self, callback: BluetoothCallback, match_dict: BluetoothMatcher | None = None
|
||||
) -> Callable[[], None]:
|
||||
"""Register a callback."""
|
||||
callback_entry = (callback, match_dict)
|
||||
self._callbacks.append(callback_entry)
|
||||
|
||||
@hass_callback
|
||||
def _async_remove_callback() -> None:
|
||||
self._callbacks.remove(callback_entry)
|
||||
|
||||
return _async_remove_callback
|
||||
|
||||
async def async_stop(self, event: Event) -> None:
|
||||
"""Stop bluetooth discovery."""
|
||||
if self._cancel_device_detected:
|
||||
self._cancel_device_detected()
|
||||
self._cancel_device_detected = None
|
||||
if self.scanner:
|
||||
await self.scanner.stop()
|
||||
models.HA_BLEAK_SCANNER = None
|
3
homeassistant/components/bluetooth/const.py
Normal file
3
homeassistant/components/bluetooth/const.py
Normal file
@ -0,0 +1,3 @@
|
||||
"""Constants for the Bluetooth integration."""
|
||||
|
||||
DOMAIN = "bluetooth"
|
10
homeassistant/components/bluetooth/manifest.json
Normal file
10
homeassistant/components/bluetooth/manifest.json
Normal file
@ -0,0 +1,10 @@
|
||||
{
|
||||
"domain": "bluetooth",
|
||||
"name": "Bluetooth",
|
||||
"documentation": "https://www.home-assistant.io/integrations/bluetooth",
|
||||
"dependencies": ["websocket_api"],
|
||||
"quality_scale": "internal",
|
||||
"requirements": ["bleak==0.14.3"],
|
||||
"codeowners": ["@bdraco"],
|
||||
"iot_class": "local_push"
|
||||
}
|
142
homeassistant/components/bluetooth/models.py
Normal file
142
homeassistant/components/bluetooth/models.py
Normal file
@ -0,0 +1,142 @@
|
||||
"""Models for bluetooth."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import logging
|
||||
from typing import Any, Final, cast
|
||||
|
||||
from bleak import BleakScanner
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.backends.scanner import AdvertisementData, AdvertisementDataCallback
|
||||
from lru import LRU # pylint: disable=no-name-in-module
|
||||
|
||||
from homeassistant.core import CALLBACK_TYPE, callback as hass_callback
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
FILTER_UUIDS: Final = "UUIDs"
|
||||
|
||||
HA_BLEAK_SCANNER: HaBleakScanner | None = None
|
||||
|
||||
MAX_HISTORY_SIZE: Final = 512
|
||||
|
||||
|
||||
def _dispatch_callback(
|
||||
callback: AdvertisementDataCallback,
|
||||
filters: dict[str, set[str]],
|
||||
device: BLEDevice,
|
||||
advertisement_data: AdvertisementData,
|
||||
) -> None:
|
||||
"""Dispatch the callback."""
|
||||
if not callback:
|
||||
# Callback destroyed right before being called, ignore
|
||||
return
|
||||
|
||||
if (uuids := filters.get(FILTER_UUIDS)) and not uuids.intersection(
|
||||
advertisement_data.service_uuids
|
||||
):
|
||||
return
|
||||
|
||||
try:
|
||||
callback(device, advertisement_data)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.exception("Error in callback: %s", callback)
|
||||
|
||||
|
||||
class HaBleakScanner(BleakScanner): # type: ignore[misc]
|
||||
"""BleakScanner that cannot be stopped."""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Initialize the BleakScanner."""
|
||||
self._callbacks: list[
|
||||
tuple[AdvertisementDataCallback, dict[str, set[str]]]
|
||||
] = []
|
||||
self._history: LRU = LRU(MAX_HISTORY_SIZE)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
@hass_callback
|
||||
def async_register_callback(
|
||||
self, callback: AdvertisementDataCallback, filters: dict[str, set[str]]
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Register a callback."""
|
||||
callback_entry = (callback, filters)
|
||||
self._callbacks.append(callback_entry)
|
||||
|
||||
@hass_callback
|
||||
def _remove_callback() -> None:
|
||||
self._callbacks.remove(callback_entry)
|
||||
|
||||
# Replay the history since otherwise we miss devices
|
||||
# that were already discovered before the callback was registered
|
||||
# or we are in passive mode
|
||||
for device, advertisement_data in self._history.values():
|
||||
_dispatch_callback(callback, filters, device, advertisement_data)
|
||||
|
||||
return _remove_callback
|
||||
|
||||
def async_callback_dispatcher(
|
||||
self, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Dispatch the callback.
|
||||
|
||||
Here we get the actual callback from bleak and dispatch
|
||||
it to all the wrapped HaBleakScannerWrapper classes
|
||||
"""
|
||||
self._history[device.address] = (device, advertisement_data)
|
||||
for callback_filters in self._callbacks:
|
||||
_dispatch_callback(*callback_filters, device, advertisement_data)
|
||||
|
||||
|
||||
class HaBleakScannerWrapper(BleakScanner): # type: ignore[misc]
|
||||
"""A wrapper that uses the single instance."""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Initialize the BleakScanner."""
|
||||
self._detection_cancel: CALLBACK_TYPE | None = None
|
||||
self._mapped_filters: dict[str, set[str]] = {}
|
||||
if "filters" in kwargs:
|
||||
self._mapped_filters = {k: set(v) for k, v in kwargs["filters"].items()}
|
||||
if "service_uuids" in kwargs:
|
||||
self._mapped_filters[FILTER_UUIDS] = set(kwargs["service_uuids"])
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def stop(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Stop scanning for devices."""
|
||||
return
|
||||
|
||||
async def start(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""Start scanning for devices."""
|
||||
return
|
||||
|
||||
def _cancel_callback(self) -> None:
|
||||
"""Cancel callback."""
|
||||
if self._detection_cancel:
|
||||
self._detection_cancel()
|
||||
self._detection_cancel = None
|
||||
|
||||
@property
|
||||
def discovered_devices(self) -> list[BLEDevice]:
|
||||
"""Return a list of discovered devices."""
|
||||
assert HA_BLEAK_SCANNER is not None
|
||||
return cast(list[BLEDevice], HA_BLEAK_SCANNER.discovered_devices)
|
||||
|
||||
def register_detection_callback(self, callback: AdvertisementDataCallback) -> None:
|
||||
"""Register a callback that is called when a device is discovered or has a property changed.
|
||||
|
||||
This method takes the callback and registers it with the long running
|
||||
scanner.
|
||||
"""
|
||||
self._cancel_callback()
|
||||
super().register_detection_callback(callback)
|
||||
assert HA_BLEAK_SCANNER is not None
|
||||
self._detection_cancel = HA_BLEAK_SCANNER.async_register_callback(
|
||||
self._callback, self._mapped_filters
|
||||
)
|
||||
|
||||
def __del__(self) -> None:
|
||||
"""Delete the BleakScanner."""
|
||||
if self._detection_cancel:
|
||||
# Nothing to do if event loop is already closed
|
||||
with contextlib.suppress(RuntimeError):
|
||||
asyncio.get_running_loop().call_soon_threadsafe(self._detection_cancel)
|
13
homeassistant/components/bluetooth/usage.py
Normal file
13
homeassistant/components/bluetooth/usage.py
Normal file
@ -0,0 +1,13 @@
|
||||
"""bluetooth usage utility to handle multiple instances."""
|
||||
from __future__ import annotations
|
||||
|
||||
import bleak
|
||||
|
||||
from . import models
|
||||
from .models import HaBleakScanner, HaBleakScannerWrapper
|
||||
|
||||
|
||||
def install_multiple_bleak_catcher(hass_bleak_scanner: HaBleakScanner) -> None:
|
||||
"""Wrap the bleak classes to return the shared instance if multiple instances are detected."""
|
||||
models.HA_BLEAK_SCANNER = hass_bleak_scanner
|
||||
bleak.BleakScanner = HaBleakScannerWrapper
|
@ -5,6 +5,7 @@
|
||||
"requirements": ["PySwitchbot==0.14.0"],
|
||||
"config_flow": true,
|
||||
"codeowners": ["@danielhiversen", "@RenierM26"],
|
||||
"bluetooth": [{ "service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b" }],
|
||||
"iot_class": "local_polling",
|
||||
"loggers": ["switchbot"]
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ from .util import uuid as uuid_util
|
||||
from .util.decorator import Registry
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .components.bluetooth import BluetoothServiceInfo
|
||||
from .components.dhcp import DhcpServiceInfo
|
||||
from .components.hassio import HassioServiceInfo
|
||||
from .components.mqtt import MqttServiceInfo
|
||||
@ -37,6 +38,7 @@ if TYPE_CHECKING:
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
SOURCE_BLUETOOTH = "bluetooth"
|
||||
SOURCE_DHCP = "dhcp"
|
||||
SOURCE_DISCOVERY = "discovery"
|
||||
SOURCE_HASSIO = "hassio"
|
||||
@ -116,6 +118,7 @@ class ConfigEntryState(Enum):
|
||||
DEFAULT_DISCOVERY_UNIQUE_ID = "default_discovery_unique_id"
|
||||
DISCOVERY_NOTIFICATION_ID = "config_entry_discovery"
|
||||
DISCOVERY_SOURCES = {
|
||||
SOURCE_BLUETOOTH,
|
||||
SOURCE_DHCP,
|
||||
SOURCE_DISCOVERY,
|
||||
SOURCE_HOMEKIT,
|
||||
@ -1460,6 +1463,12 @@ class ConfigFlow(data_entry_flow.FlowHandler):
|
||||
reason=reason, description_placeholders=description_placeholders
|
||||
)
|
||||
|
||||
async def async_step_bluetooth(
|
||||
self, discovery_info: BluetoothServiceInfo
|
||||
) -> data_entry_flow.FlowResult:
|
||||
"""Handle a flow initialized by Bluetooth discovery."""
|
||||
return await self.async_step_discovery(dataclasses.asdict(discovery_info))
|
||||
|
||||
async def async_step_dhcp(
|
||||
self, discovery_info: DhcpServiceInfo
|
||||
) -> data_entry_flow.FlowResult:
|
||||
|
14
homeassistant/generated/bluetooth.py
Normal file
14
homeassistant/generated/bluetooth.py
Normal file
@ -0,0 +1,14 @@
|
||||
"""Automatically generated by hassfest.
|
||||
|
||||
To update, run python3 -m script.hassfest
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
# fmt: off
|
||||
|
||||
BLUETOOTH: list[dict[str, str | int]] = [
|
||||
{
|
||||
"domain": "switchbot",
|
||||
"service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b"
|
||||
}
|
||||
]
|
@ -6,7 +6,7 @@ import logging
|
||||
from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union, cast
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components import dhcp, onboarding, ssdp, zeroconf
|
||||
from homeassistant.components import bluetooth, dhcp, onboarding, ssdp, zeroconf
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
|
||||
@ -92,6 +92,17 @@ class DiscoveryFlowHandler(config_entries.ConfigFlow, Generic[_R]):
|
||||
|
||||
return await self.async_step_confirm()
|
||||
|
||||
async def async_step_bluetooth(
|
||||
self, discovery_info: bluetooth.BluetoothServiceInfo
|
||||
) -> FlowResult:
|
||||
"""Handle a flow initialized by bluetooth discovery."""
|
||||
if self._async_in_progress() or self._async_current_entries():
|
||||
return self.async_abort(reason="single_instance_allowed")
|
||||
|
||||
await self.async_set_unique_id(self._domain)
|
||||
|
||||
return await self.async_step_confirm()
|
||||
|
||||
async def async_step_dhcp(self, discovery_info: dhcp.DhcpServiceInfo) -> FlowResult:
|
||||
"""Handle a flow initialized by dhcp discovery."""
|
||||
if self._async_in_progress() or self._async_current_entries():
|
||||
|
@ -24,6 +24,7 @@ from awesomeversion import (
|
||||
)
|
||||
|
||||
from .generated.application_credentials import APPLICATION_CREDENTIALS
|
||||
from .generated.bluetooth import BLUETOOTH
|
||||
from .generated.dhcp import DHCP
|
||||
from .generated.mqtt import MQTT
|
||||
from .generated.ssdp import SSDP
|
||||
@ -77,6 +78,25 @@ class DHCPMatcher(DHCPMatcherRequired, DHCPMatcherOptional):
|
||||
"""Matcher for the dhcp integration."""
|
||||
|
||||
|
||||
class BluetoothMatcherRequired(TypedDict, total=True):
|
||||
"""Matcher for the bluetooth integration for required fields."""
|
||||
|
||||
domain: str
|
||||
|
||||
|
||||
class BluetoothMatcherOptional(TypedDict, total=False):
|
||||
"""Matcher for the bluetooth integration for optional fields."""
|
||||
|
||||
local_name: str
|
||||
service_uuid: str
|
||||
manufacturer_id: int
|
||||
manufacturer_data_first_byte: int
|
||||
|
||||
|
||||
class BluetoothMatcher(BluetoothMatcherRequired, BluetoothMatcherOptional):
|
||||
"""Matcher for the bluetooth integration."""
|
||||
|
||||
|
||||
class Manifest(TypedDict, total=False):
|
||||
"""
|
||||
Integration manifest.
|
||||
@ -97,6 +117,7 @@ class Manifest(TypedDict, total=False):
|
||||
issue_tracker: str
|
||||
quality_scale: str
|
||||
iot_class: str
|
||||
bluetooth: list[dict[str, int | str]]
|
||||
mqtt: list[str]
|
||||
ssdp: list[dict[str, str]]
|
||||
zeroconf: list[str | dict[str, str]]
|
||||
@ -269,6 +290,22 @@ async def async_get_zeroconf(
|
||||
return zeroconf
|
||||
|
||||
|
||||
async def async_get_bluetooth(hass: HomeAssistant) -> list[BluetoothMatcher]:
|
||||
"""Return cached list of bluetooth types."""
|
||||
bluetooth = cast(list[BluetoothMatcher], BLUETOOTH.copy())
|
||||
|
||||
integrations = await async_get_custom_components(hass)
|
||||
for integration in integrations.values():
|
||||
if not integration.bluetooth:
|
||||
continue
|
||||
for entry in integration.bluetooth:
|
||||
bluetooth.append(
|
||||
cast(BluetoothMatcher, {"domain": integration.domain, **entry})
|
||||
)
|
||||
|
||||
return bluetooth
|
||||
|
||||
|
||||
async def async_get_dhcp(hass: HomeAssistant) -> list[DHCPMatcher]:
|
||||
"""Return cached list of dhcp types."""
|
||||
dhcp = cast(list[DHCPMatcher], DHCP.copy())
|
||||
@ -519,6 +556,11 @@ class Integration:
|
||||
"""Return Integration zeroconf entries."""
|
||||
return self.manifest.get("zeroconf")
|
||||
|
||||
@property
|
||||
def bluetooth(self) -> list[dict[str, str | int]] | None:
|
||||
"""Return Integration bluetooth entries."""
|
||||
return self.manifest.get("bluetooth")
|
||||
|
||||
@property
|
||||
def dhcp(self) -> list[dict[str, str | bool]] | None:
|
||||
"""Return Integration dhcp entries."""
|
||||
|
11
mypy.ini
11
mypy.ini
@ -390,6 +390,17 @@ no_implicit_optional = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.bluetooth.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
no_implicit_optional = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.bluetooth_tracker.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
|
@ -401,6 +401,9 @@ bimmer_connected==0.9.6
|
||||
# homeassistant.components.bizkaibus
|
||||
bizkaibus==0.1.1
|
||||
|
||||
# homeassistant.components.bluetooth
|
||||
bleak==0.14.3
|
||||
|
||||
# homeassistant.components.blebox
|
||||
blebox_uniapi==2.0.1
|
||||
|
||||
|
@ -316,6 +316,9 @@ bellows==0.31.0
|
||||
# homeassistant.components.bmw_connected_drive
|
||||
bimmer_connected==0.9.6
|
||||
|
||||
# homeassistant.components.bluetooth
|
||||
bleak==0.14.3
|
||||
|
||||
# homeassistant.components.blebox
|
||||
blebox_uniapi==2.0.1
|
||||
|
||||
|
@ -6,6 +6,7 @@ from time import monotonic
|
||||
|
||||
from . import (
|
||||
application_credentials,
|
||||
bluetooth,
|
||||
codeowners,
|
||||
config_flow,
|
||||
coverage,
|
||||
@ -27,6 +28,7 @@ from .model import Config, Integration
|
||||
|
||||
INTEGRATION_PLUGINS = [
|
||||
application_credentials,
|
||||
bluetooth,
|
||||
codeowners,
|
||||
config_flow,
|
||||
dependencies,
|
||||
|
65
script/hassfest/bluetooth.py
Normal file
65
script/hassfest/bluetooth.py
Normal file
@ -0,0 +1,65 @@
|
||||
"""Generate bluetooth file."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
from .model import Config, Integration
|
||||
|
||||
BASE = """
|
||||
\"\"\"Automatically generated by hassfest.
|
||||
|
||||
To update, run python3 -m script.hassfest
|
||||
\"\"\"
|
||||
from __future__ import annotations
|
||||
|
||||
# fmt: off
|
||||
|
||||
BLUETOOTH: list[dict[str, str | int]] = {}
|
||||
""".strip()
|
||||
|
||||
|
||||
def generate_and_validate(integrations: list[dict[str, str]]):
|
||||
"""Validate and generate bluetooth data."""
|
||||
match_list = []
|
||||
|
||||
for domain in sorted(integrations):
|
||||
integration = integrations[domain]
|
||||
|
||||
if not integration.manifest or not integration.config_flow:
|
||||
continue
|
||||
|
||||
match_types = integration.manifest.get("bluetooth", [])
|
||||
|
||||
if not match_types:
|
||||
continue
|
||||
|
||||
for entry in match_types:
|
||||
match_list.append({"domain": domain, **entry})
|
||||
|
||||
return BASE.format(json.dumps(match_list, indent=4))
|
||||
|
||||
|
||||
def validate(integrations: dict[str, Integration], config: Config):
|
||||
"""Validate bluetooth file."""
|
||||
bluetooth_path = config.root / "homeassistant/generated/bluetooth.py"
|
||||
config.cache["bluetooth"] = content = generate_and_validate(integrations)
|
||||
|
||||
if config.specific_integrations:
|
||||
return
|
||||
|
||||
with open(str(bluetooth_path)) as fp:
|
||||
current = fp.read().strip()
|
||||
if current != content:
|
||||
config.add_error(
|
||||
"bluetooth",
|
||||
"File bluetooth.py is not up to date. Run python3 -m script.hassfest",
|
||||
fixable=True,
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
def generate(integrations: dict[str, Integration], config: Config):
|
||||
"""Generate bluetooth file."""
|
||||
bluetooth_path = config.root / "homeassistant/generated/bluetooth.py"
|
||||
with open(str(bluetooth_path), "w") as fp:
|
||||
fp.write(f"{config.cache['bluetooth']}\n")
|
@ -35,6 +35,7 @@ def validate_integration(config: Config, integration: Integration):
|
||||
|
||||
needs_unique_id = integration.domain not in UNIQUE_ID_IGNORE and (
|
||||
"async_step_discovery" in config_flow
|
||||
or "async_step_bluetooth" in config_flow
|
||||
or "async_step_hassio" in config_flow
|
||||
or "async_step_homekit" in config_flow
|
||||
or "async_step_mqtt" in config_flow
|
||||
|
@ -190,6 +190,16 @@ MANIFEST_SCHEMA = vol.Schema(
|
||||
vol.Optional("ssdp"): vol.Schema(
|
||||
vol.All([vol.All(vol.Schema({}, extra=vol.ALLOW_EXTRA), vol.Length(min=1))])
|
||||
),
|
||||
vol.Optional("bluetooth"): [
|
||||
vol.Schema(
|
||||
{
|
||||
vol.Optional("service_uuid"): vol.All(str, verify_lowercase),
|
||||
vol.Optional("local_name"): vol.All(str),
|
||||
vol.Optional("manufacturer_id"): int,
|
||||
vol.Optional("manufacturer_data_first_byte"): int,
|
||||
}
|
||||
)
|
||||
],
|
||||
vol.Optional("homekit"): vol.Schema({vol.Optional("models"): [str]}),
|
||||
vol.Optional("dhcp"): [
|
||||
vol.Schema(
|
||||
|
1
tests/components/bluetooth/__init__.py
Normal file
1
tests/components/bluetooth/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
"""Tests for the Bluetooth integration."""
|
25
tests/components/bluetooth/conftest.py
Normal file
25
tests/components/bluetooth/conftest.py
Normal file
@ -0,0 +1,25 @@
|
||||
"""Tests for the bluetooth component."""
|
||||
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
|
||||
from tests.common import INSTANCES
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def verify_cleanup():
|
||||
"""Verify that the test has cleaned up resources correctly."""
|
||||
threads_before = frozenset(threading.enumerate())
|
||||
|
||||
yield
|
||||
|
||||
if len(INSTANCES) >= 2:
|
||||
count = len(INSTANCES)
|
||||
for inst in INSTANCES:
|
||||
inst.stop()
|
||||
pytest.exit(f"Detected non stopped instances ({count}), aborting test run")
|
||||
|
||||
threads = frozenset(threading.enumerate()) - threads_before
|
||||
for thread in threads:
|
||||
assert isinstance(thread, threading._DummyThread)
|
440
tests/components/bluetooth/test_init.py
Normal file
440
tests/components/bluetooth/test_init.py
Normal file
@ -0,0 +1,440 @@
|
||||
"""Tests for the Bluetooth integration."""
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import bleak
|
||||
from bleak import BleakError
|
||||
from bleak.backends.scanner import AdvertisementData, BLEDevice
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import bluetooth
|
||||
from homeassistant.components.bluetooth import (
|
||||
BluetoothChange,
|
||||
BluetoothServiceInfo,
|
||||
models,
|
||||
)
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_bleak_scanner_start():
|
||||
"""Fixture to mock starting the bleak scanner."""
|
||||
scanner = bleak.BleakScanner
|
||||
models.HA_BLEAK_SCANNER = None
|
||||
|
||||
with patch("homeassistant.components.bluetooth.HaBleakScanner.stop"), patch(
|
||||
"homeassistant.components.bluetooth.HaBleakScanner.start",
|
||||
) as mock_bleak_scanner_start:
|
||||
yield mock_bleak_scanner_start
|
||||
|
||||
# We need to drop the stop method from the object since we patched
|
||||
# out start and this fixture will expire before the stop method is called
|
||||
# when EVENT_HOMEASSISTANT_STOP is fired.
|
||||
if models.HA_BLEAK_SCANNER:
|
||||
models.HA_BLEAK_SCANNER.stop = AsyncMock()
|
||||
bleak.BleakScanner = scanner
|
||||
|
||||
|
||||
async def test_setup_and_stop(hass, mock_bleak_scanner_start):
|
||||
"""Test we and setup and stop the scanner."""
|
||||
mock_bt = [
|
||||
{"domain": "switchbot", "service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b"}
|
||||
]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
), patch.object(hass.config_entries.flow, "async_init"):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_setup_and_stop_no_bluetooth(hass, caplog):
|
||||
"""Test we fail gracefully when bluetooth is not available."""
|
||||
mock_bt = [
|
||||
{"domain": "switchbot", "service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b"}
|
||||
]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.HaBleakScanner", side_effect=BleakError
|
||||
) as mock_ha_bleak_scanner, patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
), patch.object(
|
||||
hass.config_entries.flow, "async_init"
|
||||
):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||
await hass.async_block_till_done()
|
||||
assert len(mock_ha_bleak_scanner.mock_calls) == 1
|
||||
assert "Could not create bluetooth scanner" in caplog.text
|
||||
|
||||
|
||||
async def test_discovery_match_by_service_uuid(hass, mock_bleak_scanner_start):
|
||||
"""Test bluetooth discovery match by service_uuid."""
|
||||
mock_bt = [
|
||||
{"domain": "switchbot", "service_uuid": "cba20d00-224d-11e6-9fb8-0002a5d5c51b"}
|
||||
]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
|
||||
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(wrong_device, wrong_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
|
||||
switchbot_adv = AdvertisementData(
|
||||
local_name="wohand", service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
|
||||
)
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "switchbot"
|
||||
|
||||
|
||||
async def test_discovery_match_by_local_name(hass, mock_bleak_scanner_start):
|
||||
"""Test bluetooth discovery match by local_name."""
|
||||
mock_bt = [{"domain": "switchbot", "local_name": "wohand"}]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
wrong_device = BLEDevice("44:44:33:11:23:45", "wrong_name")
|
||||
wrong_adv = AdvertisementData(local_name="wrong_name", service_uuids=[])
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(wrong_device, wrong_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
|
||||
switchbot_adv = AdvertisementData(local_name="wohand", service_uuids=[])
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "switchbot"
|
||||
|
||||
|
||||
async def test_discovery_match_by_manufacturer_id_and_first_byte(
|
||||
hass, mock_bleak_scanner_start
|
||||
):
|
||||
"""Test bluetooth discovery match by manufacturer_id and manufacturer_data_first_byte."""
|
||||
mock_bt = [
|
||||
{
|
||||
"domain": "homekit_controller",
|
||||
"manufacturer_id": 76,
|
||||
"manufacturer_data_first_byte": 0x06,
|
||||
}
|
||||
]
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
), patch.object(hass.config_entries.flow, "async_init") as mock_config_flow:
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
hkc_device = BLEDevice("44:44:33:11:23:45", "lock")
|
||||
hkc_adv = AdvertisementData(
|
||||
local_name="lock", service_uuids=[], manufacturer_data={76: b"\x06"}
|
||||
)
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(hkc_device, hkc_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 1
|
||||
assert mock_config_flow.mock_calls[0][1][0] == "homekit_controller"
|
||||
mock_config_flow.reset_mock()
|
||||
|
||||
# 2nd discovery should not generate another flow
|
||||
models.HA_BLEAK_SCANNER._callback(hkc_device, hkc_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
mock_config_flow.reset_mock()
|
||||
not_hkc_device = BLEDevice("44:44:33:11:23:21", "lock")
|
||||
not_hkc_adv = AdvertisementData(
|
||||
local_name="lock", service_uuids=[], manufacturer_data={76: b"\x02"}
|
||||
)
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(not_hkc_device, not_hkc_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
not_apple_device = BLEDevice("44:44:33:11:23:23", "lock")
|
||||
not_apple_adv = AdvertisementData(
|
||||
local_name="lock", service_uuids=[], manufacturer_data={21: b"\x02"}
|
||||
)
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(not_apple_device, not_apple_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(mock_config_flow.mock_calls) == 0
|
||||
|
||||
|
||||
async def test_register_callbacks(hass, mock_bleak_scanner_start):
|
||||
"""Test configured options for a device are loaded via config entry."""
|
||||
mock_bt = []
|
||||
callbacks = []
|
||||
|
||||
def _fake_subscriber(
|
||||
service_info: BluetoothServiceInfo, change: BluetoothChange
|
||||
) -> None:
|
||||
"""Fake subscriber for the BleakScanner."""
|
||||
callbacks.append((service_info, change))
|
||||
if len(callbacks) >= 3:
|
||||
raise ValueError
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=mock_bt
|
||||
), patch.object(hass.config_entries.flow, "async_init"):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
cancel = bluetooth.async_register_callback(
|
||||
hass,
|
||||
_fake_subscriber,
|
||||
{"service_uuids": {"cba20d00-224d-11e6-9fb8-0002a5d5c51b"}},
|
||||
)
|
||||
|
||||
assert len(mock_bleak_scanner_start.mock_calls) == 1
|
||||
|
||||
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
|
||||
switchbot_adv = AdvertisementData(
|
||||
local_name="wohand",
|
||||
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
|
||||
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
|
||||
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
|
||||
)
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
|
||||
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
|
||||
empty_adv = AdvertisementData(local_name="empty")
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(empty_device, empty_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
|
||||
empty_adv = AdvertisementData(local_name="empty")
|
||||
|
||||
# 3rd callback raises ValueError but is still tracked
|
||||
models.HA_BLEAK_SCANNER._callback(empty_device, empty_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
cancel()
|
||||
|
||||
# 4th callback should not be tracked since we canceled
|
||||
models.HA_BLEAK_SCANNER._callback(empty_device, empty_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(callbacks) == 3
|
||||
|
||||
service_info: BluetoothServiceInfo = callbacks[0][0]
|
||||
assert service_info.name == "wohand"
|
||||
assert service_info.manufacturer == "Nordic Semiconductor ASA"
|
||||
assert service_info.manufacturer_id == 89
|
||||
|
||||
service_info: BluetoothServiceInfo = callbacks[1][0]
|
||||
assert service_info.name == "empty"
|
||||
assert service_info.manufacturer is None
|
||||
assert service_info.manufacturer_id is None
|
||||
|
||||
service_info: BluetoothServiceInfo = callbacks[2][0]
|
||||
assert service_info.name == "empty"
|
||||
assert service_info.manufacturer is None
|
||||
assert service_info.manufacturer_id is None
|
||||
|
||||
|
||||
async def test_wrapped_instance_with_filter(hass, mock_bleak_scanner_start):
|
||||
"""Test consumers can use the wrapped instance with a filter as if it was normal BleakScanner."""
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=[]
|
||||
), patch.object(hass.config_entries.flow, "async_init"):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
detected = []
|
||||
|
||||
def _device_detected(
|
||||
device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Handle a detected device."""
|
||||
detected.append((device, advertisement_data))
|
||||
|
||||
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
|
||||
switchbot_adv = AdvertisementData(
|
||||
local_name="wohand",
|
||||
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
|
||||
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
|
||||
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
|
||||
)
|
||||
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
|
||||
empty_adv = AdvertisementData(local_name="empty")
|
||||
|
||||
assert models.HA_BLEAK_SCANNER is not None
|
||||
scanner = models.HaBleakScannerWrapper(
|
||||
filters={"UUIDs": ["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]}
|
||||
)
|
||||
scanner.register_detection_callback(_device_detected)
|
||||
|
||||
mock_discovered = [MagicMock()]
|
||||
type(models.HA_BLEAK_SCANNER).discovered_devices = mock_discovered
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
discovered = await scanner.discover(timeout=0)
|
||||
assert len(discovered) == 1
|
||||
assert discovered == mock_discovered
|
||||
assert len(detected) == 1
|
||||
|
||||
scanner.register_detection_callback(_device_detected)
|
||||
# We should get a reply from the history when we register again
|
||||
assert len(detected) == 2
|
||||
scanner.register_detection_callback(_device_detected)
|
||||
# We should get a reply from the history when we register again
|
||||
assert len(detected) == 3
|
||||
|
||||
type(models.HA_BLEAK_SCANNER).discovered_devices = []
|
||||
discovered = await scanner.discover(timeout=0)
|
||||
assert len(discovered) == 0
|
||||
assert discovered == []
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
assert len(detected) == 4
|
||||
|
||||
# The filter we created in the wrapped scanner with should be respected
|
||||
# and we should not get another callback
|
||||
models.HA_BLEAK_SCANNER._callback(empty_device, empty_adv)
|
||||
assert len(detected) == 4
|
||||
|
||||
|
||||
async def test_wrapped_instance_with_service_uuids(hass, mock_bleak_scanner_start):
|
||||
"""Test consumers can use the wrapped instance with a service_uuids list as if it was normal BleakScanner."""
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=[]
|
||||
), patch.object(hass.config_entries.flow, "async_init"):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
detected = []
|
||||
|
||||
def _device_detected(
|
||||
device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Handle a detected device."""
|
||||
detected.append((device, advertisement_data))
|
||||
|
||||
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
|
||||
switchbot_adv = AdvertisementData(
|
||||
local_name="wohand",
|
||||
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
|
||||
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
|
||||
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
|
||||
)
|
||||
empty_device = BLEDevice("11:22:33:44:55:66", "empty")
|
||||
empty_adv = AdvertisementData(local_name="empty")
|
||||
|
||||
assert models.HA_BLEAK_SCANNER is not None
|
||||
scanner = models.HaBleakScannerWrapper(
|
||||
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
|
||||
)
|
||||
scanner.register_detection_callback(_device_detected)
|
||||
|
||||
type(models.HA_BLEAK_SCANNER).discovered_devices = [MagicMock()]
|
||||
for _ in range(2):
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(detected) == 2
|
||||
|
||||
# The UUIDs list we created in the wrapped scanner with should be respected
|
||||
# and we should not get another callback
|
||||
models.HA_BLEAK_SCANNER._callback(empty_device, empty_adv)
|
||||
assert len(detected) == 2
|
||||
|
||||
|
||||
async def test_wrapped_instance_with_broken_callbacks(hass, mock_bleak_scanner_start):
|
||||
"""Test broken callbacks do not cause the scanner to fail."""
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.async_get_bluetooth", return_value=[]
|
||||
), patch.object(hass.config_entries.flow, "async_init"):
|
||||
assert await async_setup_component(
|
||||
hass, bluetooth.DOMAIN, {bluetooth.DOMAIN: {}}
|
||||
)
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
detected = []
|
||||
|
||||
def _device_detected(
|
||||
device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Handle a detected device."""
|
||||
if detected:
|
||||
raise ValueError
|
||||
detected.append((device, advertisement_data))
|
||||
|
||||
switchbot_device = BLEDevice("44:44:33:11:23:45", "wohand")
|
||||
switchbot_adv = AdvertisementData(
|
||||
local_name="wohand",
|
||||
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"],
|
||||
manufacturer_data={89: b"\xd8.\xad\xcd\r\x85"},
|
||||
service_data={"00000d00-0000-1000-8000-00805f9b34fb": b"H\x10c"},
|
||||
)
|
||||
|
||||
assert models.HA_BLEAK_SCANNER is not None
|
||||
scanner = models.HaBleakScannerWrapper(
|
||||
service_uuids=["cba20d00-224d-11e6-9fb8-0002a5d5c51b"]
|
||||
)
|
||||
scanner.register_detection_callback(_device_detected)
|
||||
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
await hass.async_block_till_done()
|
||||
models.HA_BLEAK_SCANNER._callback(switchbot_device, switchbot_adv)
|
||||
await hass.async_block_till_done()
|
||||
assert len(detected) == 1
|
22
tests/components/bluetooth/test_usage.py
Normal file
22
tests/components/bluetooth/test_usage.py
Normal file
@ -0,0 +1,22 @@
|
||||
"""Tests for the Bluetooth integration."""
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import bleak
|
||||
|
||||
from homeassistant.components.bluetooth import models
|
||||
from homeassistant.components.bluetooth.models import HaBleakScannerWrapper
|
||||
from homeassistant.components.bluetooth.usage import install_multiple_bleak_catcher
|
||||
|
||||
|
||||
async def test_multiple_bleak_scanner_instances(hass):
|
||||
"""Test creating multiple zeroconf throws without an integration."""
|
||||
assert models.HA_BLEAK_SCANNER is None
|
||||
mock_scanner = MagicMock()
|
||||
|
||||
install_multiple_bleak_catcher(mock_scanner)
|
||||
|
||||
instance = bleak.BleakScanner()
|
||||
|
||||
assert isinstance(instance, HaBleakScannerWrapper)
|
||||
assert models.HA_BLEAK_SCANNER is mock_scanner
|
@ -94,6 +94,7 @@ async def test_user_has_confirmation(hass, discovery_flow_conf):
|
||||
@pytest.mark.parametrize(
|
||||
"source",
|
||||
[
|
||||
config_entries.SOURCE_BLUETOOTH,
|
||||
config_entries.SOURCE_DISCOVERY,
|
||||
config_entries.SOURCE_MQTT,
|
||||
config_entries.SOURCE_SSDP,
|
||||
@ -117,6 +118,7 @@ async def test_discovery_single_instance(hass, discovery_flow_conf, source):
|
||||
@pytest.mark.parametrize(
|
||||
"source",
|
||||
[
|
||||
config_entries.SOURCE_BLUETOOTH,
|
||||
config_entries.SOURCE_DISCOVERY,
|
||||
config_entries.SOURCE_MQTT,
|
||||
config_entries.SOURCE_SSDP,
|
||||
@ -142,6 +144,7 @@ async def test_discovery_confirmation(hass, discovery_flow_conf, source):
|
||||
@pytest.mark.parametrize(
|
||||
"source",
|
||||
[
|
||||
config_entries.SOURCE_BLUETOOTH,
|
||||
config_entries.SOURCE_DISCOVERY,
|
||||
config_entries.SOURCE_MQTT,
|
||||
config_entries.SOURCE_SSDP,
|
||||
|
@ -2497,6 +2497,7 @@ async def test_async_setup_update_entry(hass):
|
||||
@pytest.mark.parametrize(
|
||||
"discovery_source",
|
||||
(
|
||||
(config_entries.SOURCE_BLUETOOTH, BaseServiceInfo()),
|
||||
(config_entries.SOURCE_DISCOVERY, {}),
|
||||
(config_entries.SOURCE_SSDP, BaseServiceInfo()),
|
||||
(config_entries.SOURCE_USB, BaseServiceInfo()),
|
||||
|
@ -205,6 +205,7 @@ def test_integration_properties(hass):
|
||||
{"hostname": "tesla_*", "macaddress": "98ED5C*"},
|
||||
{"registered_devices": True},
|
||||
],
|
||||
"bluetooth": [{"manufacturer_id": 76, "manufacturer_data_first_byte": 6}],
|
||||
"usb": [
|
||||
{"vid": "10C4", "pid": "EA60"},
|
||||
{"vid": "1CF1", "pid": "0030"},
|
||||
@ -242,6 +243,9 @@ def test_integration_properties(hass):
|
||||
{"vid": "1A86", "pid": "7523"},
|
||||
{"vid": "10C4", "pid": "8A2A"},
|
||||
]
|
||||
assert integration.bluetooth == [
|
||||
{"manufacturer_id": 76, "manufacturer_data_first_byte": 6}
|
||||
]
|
||||
assert integration.ssdp == [
|
||||
{
|
||||
"manufacturer": "Royal Philips Electronics",
|
||||
@ -274,6 +278,7 @@ def test_integration_properties(hass):
|
||||
assert integration.homekit is None
|
||||
assert integration.zeroconf is None
|
||||
assert integration.dhcp is None
|
||||
assert integration.bluetooth is None
|
||||
assert integration.usb is None
|
||||
assert integration.ssdp is None
|
||||
assert integration.mqtt is None
|
||||
@ -296,6 +301,7 @@ def test_integration_properties(hass):
|
||||
assert integration.zeroconf == [{"type": "_hue._tcp.local.", "name": "hue*"}]
|
||||
assert integration.dhcp is None
|
||||
assert integration.usb is None
|
||||
assert integration.bluetooth is None
|
||||
assert integration.ssdp is None
|
||||
|
||||
|
||||
@ -417,6 +423,25 @@ def _get_test_integration_with_dhcp_matcher(hass, name, config_flow):
|
||||
)
|
||||
|
||||
|
||||
def _get_test_integration_with_bluetooth_matcher(hass, name, config_flow):
|
||||
"""Return a generated test integration with a bluetooth matcher."""
|
||||
return loader.Integration(
|
||||
hass,
|
||||
f"homeassistant.components.{name}",
|
||||
None,
|
||||
{
|
||||
"name": name,
|
||||
"domain": name,
|
||||
"config_flow": config_flow,
|
||||
"bluetooth": [
|
||||
{
|
||||
"local_name": "Prodigio_*",
|
||||
},
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _get_test_integration_with_usb_matcher(hass, name, config_flow):
|
||||
"""Return a generated test integration with a usb matcher."""
|
||||
return loader.Integration(
|
||||
@ -543,6 +568,26 @@ async def test_get_zeroconf_back_compat(hass):
|
||||
]
|
||||
|
||||
|
||||
async def test_get_bluetooth(hass):
|
||||
"""Verify that custom components with bluetooth are found."""
|
||||
test_1_integration = _get_test_integration_with_bluetooth_matcher(
|
||||
hass, "test_1", True
|
||||
)
|
||||
test_2_integration = _get_test_integration_with_dhcp_matcher(hass, "test_2", True)
|
||||
with patch("homeassistant.loader.async_get_custom_components") as mock_get:
|
||||
mock_get.return_value = {
|
||||
"test_1": test_1_integration,
|
||||
"test_2": test_2_integration,
|
||||
}
|
||||
bluetooth = await loader.async_get_bluetooth(hass)
|
||||
bluetooth_for_domain = [
|
||||
entry for entry in bluetooth if entry["domain"] == "test_1"
|
||||
]
|
||||
assert bluetooth_for_domain == [
|
||||
{"domain": "test_1", "local_name": "Prodigio_*"},
|
||||
]
|
||||
|
||||
|
||||
async def test_get_dhcp(hass):
|
||||
"""Verify that custom components with dhcp are found."""
|
||||
test_1_integration = _get_test_integration_with_dhcp_matcher(hass, "test_1", True)
|
||||
|
Loading…
x
Reference in New Issue
Block a user