mirror of
https://github.com/home-assistant/core.git
synced 2025-07-13 00:07:10 +00:00
Move bluetooth remote scanner implementation into a base class (#82012)
This commit is contained in:
parent
bd28483b43
commit
f584efa0c2
@ -50,6 +50,7 @@ from .const import (
|
||||
from .manager import BluetoothManager
|
||||
from .match import BluetoothCallbackMatcher, IntegrationMatcher
|
||||
from .models import (
|
||||
BaseHaRemoteScanner,
|
||||
BaseHaScanner,
|
||||
BluetoothCallback,
|
||||
BluetoothChange,
|
||||
@ -80,6 +81,7 @@ __all__ = [
|
||||
"async_track_unavailable",
|
||||
"async_scanner_count",
|
||||
"BaseHaScanner",
|
||||
"BaseHaRemoteScanner",
|
||||
"BluetoothServiceInfo",
|
||||
"BluetoothServiceInfoBleak",
|
||||
"BluetoothScanningMode",
|
||||
|
@ -6,6 +6,8 @@ import asyncio
|
||||
from collections.abc import Callable
|
||||
import contextlib
|
||||
from dataclasses import dataclass
|
||||
import datetime
|
||||
from datetime import timedelta
|
||||
from enum import Enum
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Final
|
||||
@ -21,8 +23,21 @@ from bleak.backends.scanner import (
|
||||
from bleak_retry_connector import NO_RSSI_VALUE, freshen_ble_device
|
||||
|
||||
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback as hass_callback
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.frame import report
|
||||
from homeassistant.helpers.service_info.bluetooth import BluetoothServiceInfo
|
||||
from homeassistant.util.dt import monotonic_time_coarse
|
||||
|
||||
from .const import FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
|
||||
# The maximum time between advertisements for a device to be considered
|
||||
# stale when the advertisement tracker can determine the interval for
|
||||
# connectable devices.
|
||||
#
|
||||
# BlueZ uses 180 seconds by default but we give it a bit more time
|
||||
# to account for the esp32's bluetooth stack being a bit slower
|
||||
# than BlueZ's.
|
||||
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS: Final = 195
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
@ -35,6 +50,8 @@ FILTER_UUIDS: Final = "UUIDs"
|
||||
|
||||
MANAGER: BluetoothManager | None = None
|
||||
|
||||
MONOTONIC_TIME: Final = monotonic_time_coarse
|
||||
|
||||
|
||||
@dataclass
|
||||
class BluetoothServiceInfoBleak(BluetoothServiceInfo):
|
||||
@ -134,6 +151,142 @@ class BaseHaScanner:
|
||||
}
|
||||
|
||||
|
||||
class BaseHaRemoteScanner(BaseHaScanner):
|
||||
"""Base class for a Home Assistant remote BLE scanner."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
scanner_id: str,
|
||||
new_info_callback: Callable[[BluetoothServiceInfoBleak], None],
|
||||
connector: HaBluetoothConnector,
|
||||
connectable: bool,
|
||||
) -> None:
|
||||
"""Initialize the scanner."""
|
||||
super().__init__(hass, scanner_id)
|
||||
self._new_info_callback = new_info_callback
|
||||
self._discovered_device_advertisement_datas: dict[
|
||||
str, tuple[BLEDevice, AdvertisementData]
|
||||
] = {}
|
||||
self._discovered_device_timestamps: dict[str, float] = {}
|
||||
self._connector = connector
|
||||
self._connectable = connectable
|
||||
self._details: dict[str, str | HaBluetoothConnector] = {"source": scanner_id}
|
||||
self._expire_seconds = FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
if connectable:
|
||||
self._details["connector"] = connector
|
||||
self._expire_seconds = (
|
||||
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
)
|
||||
|
||||
@hass_callback
|
||||
def async_setup(self) -> CALLBACK_TYPE:
|
||||
"""Set up the scanner."""
|
||||
return async_track_time_interval(
|
||||
self.hass, self._async_expire_devices, timedelta(seconds=30)
|
||||
)
|
||||
|
||||
def _async_expire_devices(self, _datetime: datetime.datetime) -> None:
|
||||
"""Expire old devices."""
|
||||
now = MONOTONIC_TIME()
|
||||
expired = [
|
||||
address
|
||||
for address, timestamp in self._discovered_device_timestamps.items()
|
||||
if now - timestamp > self._expire_seconds
|
||||
]
|
||||
for address in expired:
|
||||
del self._discovered_device_advertisement_datas[address]
|
||||
del self._discovered_device_timestamps[address]
|
||||
|
||||
@property
|
||||
def discovered_devices(self) -> list[BLEDevice]:
|
||||
"""Return a list of discovered devices."""
|
||||
return [
|
||||
device_advertisement_data[0]
|
||||
for device_advertisement_data in self._discovered_device_advertisement_datas.values()
|
||||
]
|
||||
|
||||
@property
|
||||
def discovered_devices_and_advertisement_data(
|
||||
self,
|
||||
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
|
||||
"""Return a list of discovered devices and advertisement data."""
|
||||
return self._discovered_device_advertisement_datas
|
||||
|
||||
@hass_callback
|
||||
def _async_on_advertisement(
|
||||
self,
|
||||
address: str,
|
||||
rssi: int,
|
||||
local_name: str | None,
|
||||
service_uuids: list[str],
|
||||
service_data: dict[str, bytes],
|
||||
manufacturer_data: dict[int, bytes],
|
||||
tx_power: int | None,
|
||||
) -> None:
|
||||
"""Call the registered callback."""
|
||||
now = MONOTONIC_TIME()
|
||||
if prev_discovery := self._discovered_device_advertisement_datas.get(address):
|
||||
# Merge the new data with the old data
|
||||
# to function the same as BlueZ which
|
||||
# merges the dicts on PropertiesChanged
|
||||
prev_device = prev_discovery[0]
|
||||
prev_advertisement = prev_discovery[1]
|
||||
if (
|
||||
local_name
|
||||
and prev_device.name
|
||||
and len(prev_device.name) > len(local_name)
|
||||
):
|
||||
local_name = prev_device.name
|
||||
if prev_advertisement.service_uuids:
|
||||
service_uuids = list(
|
||||
set(service_uuids + prev_advertisement.service_uuids)
|
||||
)
|
||||
if prev_advertisement.service_data:
|
||||
service_data = {**prev_advertisement.service_data, **service_data}
|
||||
if prev_advertisement.manufacturer_data:
|
||||
manufacturer_data = {
|
||||
**prev_advertisement.manufacturer_data,
|
||||
**manufacturer_data,
|
||||
}
|
||||
|
||||
advertisement_data = AdvertisementData(
|
||||
local_name=None if local_name == "" else local_name,
|
||||
manufacturer_data=manufacturer_data,
|
||||
service_data=service_data,
|
||||
service_uuids=service_uuids,
|
||||
rssi=rssi,
|
||||
tx_power=NO_RSSI_VALUE if tx_power is None else tx_power,
|
||||
platform_data=(),
|
||||
)
|
||||
device = BLEDevice( # type: ignore[no-untyped-call]
|
||||
address=address,
|
||||
name=local_name,
|
||||
details=self._details,
|
||||
rssi=rssi, # deprecated, will be removed in newer bleak
|
||||
)
|
||||
self._discovered_device_advertisement_datas[address] = (
|
||||
device,
|
||||
advertisement_data,
|
||||
)
|
||||
self._discovered_device_timestamps[address] = now
|
||||
self._new_info_callback(
|
||||
BluetoothServiceInfoBleak(
|
||||
name=advertisement_data.local_name or device.name or device.address,
|
||||
address=device.address,
|
||||
rssi=rssi,
|
||||
manufacturer_data=advertisement_data.manufacturer_data,
|
||||
service_data=advertisement_data.service_data,
|
||||
service_uuids=advertisement_data.service_uuids,
|
||||
source=self.source,
|
||||
device=device,
|
||||
advertisement=advertisement_data,
|
||||
connectable=self._connectable,
|
||||
time=now,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class HaBleakScannerWrapper(BaseBleakScanner):
|
||||
"""A wrapper that uses the single instance."""
|
||||
|
||||
|
@ -1,147 +1,29 @@
|
||||
"""Bluetooth scanner for esphome."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import datetime
|
||||
from datetime import timedelta
|
||||
import re
|
||||
import time
|
||||
from typing import Final
|
||||
|
||||
from aioesphomeapi import BluetoothLEAdvertisement
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.backends.scanner import AdvertisementData
|
||||
|
||||
from homeassistant.components.bluetooth import (
|
||||
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
|
||||
BaseHaScanner,
|
||||
BluetoothServiceInfoBleak,
|
||||
HaBluetoothConnector,
|
||||
)
|
||||
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.util.dt import monotonic_time_coarse
|
||||
from homeassistant.components.bluetooth import BaseHaRemoteScanner
|
||||
from homeassistant.core import callback
|
||||
|
||||
TWO_CHAR = re.compile("..")
|
||||
|
||||
# The maximum time between advertisements for a device to be considered
|
||||
# stale when the advertisement tracker can determine the interval for
|
||||
# connectable devices.
|
||||
#
|
||||
# BlueZ uses 180 seconds by default but we give it a bit more time
|
||||
# to account for the esp32's bluetooth stack being a bit slower
|
||||
# than BlueZ's.
|
||||
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS: Final = 195
|
||||
|
||||
|
||||
class ESPHomeScanner(BaseHaScanner):
|
||||
class ESPHomeScanner(BaseHaRemoteScanner):
|
||||
"""Scanner for esphome."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
scanner_id: str,
|
||||
new_info_callback: Callable[[BluetoothServiceInfoBleak], None],
|
||||
connector: HaBluetoothConnector,
|
||||
connectable: bool,
|
||||
) -> None:
|
||||
"""Initialize the scanner."""
|
||||
super().__init__(hass, scanner_id)
|
||||
self._new_info_callback = new_info_callback
|
||||
self._discovered_device_advertisement_datas: dict[
|
||||
str, tuple[BLEDevice, AdvertisementData]
|
||||
] = {}
|
||||
self._discovered_device_timestamps: dict[str, float] = {}
|
||||
self._connector = connector
|
||||
self._connectable = connectable
|
||||
self._details: dict[str, str | HaBluetoothConnector] = {"source": scanner_id}
|
||||
self._fallback_seconds = FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
if connectable:
|
||||
self._details["connector"] = connector
|
||||
self._fallback_seconds = (
|
||||
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_setup(self) -> CALLBACK_TYPE:
|
||||
"""Set up the scanner."""
|
||||
return async_track_time_interval(
|
||||
self.hass, self._async_expire_devices, timedelta(seconds=30)
|
||||
)
|
||||
|
||||
def _async_expire_devices(self, _datetime: datetime.datetime) -> None:
|
||||
"""Expire old devices."""
|
||||
now = time.monotonic()
|
||||
expired = [
|
||||
address
|
||||
for address, timestamp in self._discovered_device_timestamps.items()
|
||||
if now - timestamp > self._fallback_seconds
|
||||
]
|
||||
for address in expired:
|
||||
del self._discovered_device_advertisement_datas[address]
|
||||
del self._discovered_device_timestamps[address]
|
||||
|
||||
@property
|
||||
def discovered_devices(self) -> list[BLEDevice]:
|
||||
"""Return a list of discovered devices."""
|
||||
return [
|
||||
device_advertisement_data[0]
|
||||
for device_advertisement_data in self._discovered_device_advertisement_datas.values()
|
||||
]
|
||||
|
||||
@property
|
||||
def discovered_devices_and_advertisement_data(
|
||||
self,
|
||||
) -> dict[str, tuple[BLEDevice, AdvertisementData]]:
|
||||
"""Return a list of discovered devices and advertisement data."""
|
||||
return self._discovered_device_advertisement_datas
|
||||
|
||||
@callback
|
||||
def async_on_advertisement(self, adv: BluetoothLEAdvertisement) -> None:
|
||||
"""Call the registered callback."""
|
||||
now = monotonic_time_coarse()
|
||||
address = ":".join(TWO_CHAR.findall("%012X" % adv.address)) # must be upper
|
||||
name = adv.name
|
||||
if prev_discovery := self._discovered_device_advertisement_datas.get(address):
|
||||
# If the last discovery had the full local name
|
||||
# and this one doesn't, keep the old one as we
|
||||
# always want the full local name over the short one
|
||||
prev_device = prev_discovery[0]
|
||||
if len(prev_device.name) > len(adv.name):
|
||||
name = prev_device.name
|
||||
|
||||
advertisement_data = AdvertisementData(
|
||||
local_name=None if name == "" else name,
|
||||
manufacturer_data=adv.manufacturer_data,
|
||||
service_data=adv.service_data,
|
||||
service_uuids=adv.service_uuids,
|
||||
rssi=adv.rssi,
|
||||
tx_power=-127,
|
||||
platform_data=(),
|
||||
)
|
||||
device = BLEDevice( # type: ignore[no-untyped-call]
|
||||
address=address,
|
||||
name=name,
|
||||
details=self._details,
|
||||
rssi=adv.rssi, # deprecated, will be removed in newer bleak
|
||||
)
|
||||
self._discovered_device_advertisement_datas[address] = (
|
||||
device,
|
||||
advertisement_data,
|
||||
)
|
||||
self._discovered_device_timestamps[address] = now
|
||||
self._new_info_callback(
|
||||
BluetoothServiceInfoBleak(
|
||||
name=advertisement_data.local_name or device.name or device.address,
|
||||
address=device.address,
|
||||
rssi=adv.rssi,
|
||||
manufacturer_data=advertisement_data.manufacturer_data,
|
||||
service_data=advertisement_data.service_data,
|
||||
service_uuids=advertisement_data.service_uuids,
|
||||
source=self.source,
|
||||
device=device,
|
||||
advertisement=advertisement_data,
|
||||
connectable=self._connectable,
|
||||
time=now,
|
||||
)
|
||||
self._async_on_advertisement(
|
||||
address,
|
||||
adv.rssi,
|
||||
adv.name,
|
||||
adv.service_uuids,
|
||||
adv.service_data,
|
||||
adv.manufacturer_data,
|
||||
None,
|
||||
)
|
||||
|
@ -1,6 +1,8 @@
|
||||
"""Tests for the Bluetooth integration models."""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
|
||||
import bleak
|
||||
@ -10,11 +12,15 @@ from bleak.backends.scanner import AdvertisementData
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.bluetooth.models import (
|
||||
CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
|
||||
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS,
|
||||
BaseHaRemoteScanner,
|
||||
BaseHaScanner,
|
||||
HaBleakClientWrapper,
|
||||
HaBleakScannerWrapper,
|
||||
HaBluetoothConnector,
|
||||
)
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from . import (
|
||||
_get_manager,
|
||||
@ -23,6 +29,8 @@ from . import (
|
||||
inject_advertisement_with_source,
|
||||
)
|
||||
|
||||
from tests.common import async_fire_time_changed
|
||||
|
||||
|
||||
class MockBleakClient(BleakClient):
|
||||
"""Mock bleak client."""
|
||||
@ -345,3 +353,250 @@ async def test_ble_device_with_proxy_client_out_of_connections_uses_best_availab
|
||||
client.set_disconnected_callback(lambda client: None)
|
||||
await client.disconnect()
|
||||
cancel()
|
||||
|
||||
|
||||
async def test_remote_scanner(hass):
|
||||
"""Test the remote scanner base class merges advertisement_data."""
|
||||
manager = _get_manager()
|
||||
|
||||
switchbot_device = BLEDevice(
|
||||
"44:44:33:11:23:45",
|
||||
"wohand",
|
||||
{},
|
||||
rssi=-100,
|
||||
)
|
||||
switchbot_device_adv = generate_advertisement_data(
|
||||
local_name="wohand",
|
||||
service_uuids=["050a021a-0000-1000-8000-00805f9b34fb"],
|
||||
service_data={"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff"},
|
||||
manufacturer_data={1: b"\x01"},
|
||||
rssi=-100,
|
||||
)
|
||||
switchbot_device_2 = BLEDevice(
|
||||
"44:44:33:11:23:45",
|
||||
"w",
|
||||
{},
|
||||
rssi=-100,
|
||||
)
|
||||
switchbot_device_adv_2 = generate_advertisement_data(
|
||||
local_name="wohand",
|
||||
service_uuids=["00000001-0000-1000-8000-00805f9b34fb"],
|
||||
service_data={"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff"},
|
||||
manufacturer_data={1: b"\x01", 2: b"\x02"},
|
||||
rssi=-100,
|
||||
)
|
||||
|
||||
class FakeScanner(BaseHaRemoteScanner):
|
||||
def inject_advertisement(
|
||||
self, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Inject an advertisement."""
|
||||
self._async_on_advertisement(
|
||||
device.address,
|
||||
advertisement_data.rssi,
|
||||
device.name,
|
||||
advertisement_data.service_uuids,
|
||||
advertisement_data.service_data,
|
||||
advertisement_data.manufacturer_data,
|
||||
advertisement_data.tx_power,
|
||||
)
|
||||
|
||||
new_info_callback = manager.scanner_adv_received
|
||||
connector = (
|
||||
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
|
||||
)
|
||||
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, True)
|
||||
scanner.async_setup()
|
||||
cancel = manager.async_register_scanner(scanner, True)
|
||||
|
||||
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
|
||||
|
||||
data = scanner.discovered_devices_and_advertisement_data
|
||||
discovered_device, discovered_adv_data = data[switchbot_device.address]
|
||||
assert discovered_device.address == switchbot_device.address
|
||||
assert discovered_device.name == switchbot_device.name
|
||||
assert (
|
||||
discovered_adv_data.manufacturer_data == switchbot_device_adv.manufacturer_data
|
||||
)
|
||||
assert discovered_adv_data.service_data == switchbot_device_adv.service_data
|
||||
assert discovered_adv_data.service_uuids == switchbot_device_adv.service_uuids
|
||||
scanner.inject_advertisement(switchbot_device_2, switchbot_device_adv_2)
|
||||
|
||||
data = scanner.discovered_devices_and_advertisement_data
|
||||
discovered_device, discovered_adv_data = data[switchbot_device.address]
|
||||
assert discovered_device.address == switchbot_device.address
|
||||
assert discovered_device.name == switchbot_device.name
|
||||
assert discovered_adv_data.manufacturer_data == {1: b"\x01", 2: b"\x02"}
|
||||
assert discovered_adv_data.service_data == {
|
||||
"050a021a-0000-1000-8000-00805f9b34fb": b"\n\xff",
|
||||
"00000001-0000-1000-8000-00805f9b34fb": b"\n\xff",
|
||||
}
|
||||
assert set(discovered_adv_data.service_uuids) == {
|
||||
"050a021a-0000-1000-8000-00805f9b34fb",
|
||||
"00000001-0000-1000-8000-00805f9b34fb",
|
||||
}
|
||||
|
||||
cancel()
|
||||
|
||||
|
||||
async def test_remote_scanner_expires_connectable(hass):
|
||||
"""Test the remote scanner expires stale connectable data."""
|
||||
manager = _get_manager()
|
||||
|
||||
switchbot_device = BLEDevice(
|
||||
"44:44:33:11:23:45",
|
||||
"wohand",
|
||||
{},
|
||||
rssi=-100,
|
||||
)
|
||||
switchbot_device_adv = generate_advertisement_data(
|
||||
local_name="wohand",
|
||||
service_uuids=[],
|
||||
manufacturer_data={1: b"\x01"},
|
||||
rssi=-100,
|
||||
)
|
||||
|
||||
class FakeScanner(BaseHaRemoteScanner):
|
||||
def inject_advertisement(
|
||||
self, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Inject an advertisement."""
|
||||
self._async_on_advertisement(
|
||||
device.address,
|
||||
advertisement_data.rssi,
|
||||
device.name,
|
||||
advertisement_data.service_uuids,
|
||||
advertisement_data.service_data,
|
||||
advertisement_data.manufacturer_data,
|
||||
advertisement_data.tx_power,
|
||||
)
|
||||
|
||||
new_info_callback = manager.scanner_adv_received
|
||||
connector = (
|
||||
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
|
||||
)
|
||||
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, True)
|
||||
scanner.async_setup()
|
||||
cancel = manager.async_register_scanner(scanner, True)
|
||||
|
||||
start_time_monotonic = time.monotonic()
|
||||
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
|
||||
|
||||
devices = scanner.discovered_devices
|
||||
assert len(scanner.discovered_devices) == 1
|
||||
assert len(scanner.discovered_devices_and_advertisement_data) == 1
|
||||
assert devices[0].name == "wohand"
|
||||
|
||||
expire_monotonic = (
|
||||
start_time_monotonic
|
||||
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
+ 1
|
||||
)
|
||||
expire_utc = dt_util.utcnow() + timedelta(
|
||||
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.models.MONOTONIC_TIME",
|
||||
return_value=expire_monotonic,
|
||||
):
|
||||
async_fire_time_changed(hass, expire_utc)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
devices = scanner.discovered_devices
|
||||
assert len(scanner.discovered_devices) == 0
|
||||
assert len(scanner.discovered_devices_and_advertisement_data) == 0
|
||||
|
||||
cancel()
|
||||
|
||||
|
||||
async def test_remote_scanner_expires_non_connectable(hass):
|
||||
"""Test the remote scanner expires stale non connectable data."""
|
||||
manager = _get_manager()
|
||||
|
||||
switchbot_device = BLEDevice(
|
||||
"44:44:33:11:23:45",
|
||||
"wohand",
|
||||
{},
|
||||
rssi=-100,
|
||||
)
|
||||
switchbot_device_adv = generate_advertisement_data(
|
||||
local_name="wohand",
|
||||
service_uuids=[],
|
||||
manufacturer_data={1: b"\x01"},
|
||||
rssi=-100,
|
||||
)
|
||||
|
||||
class FakeScanner(BaseHaRemoteScanner):
|
||||
def inject_advertisement(
|
||||
self, device: BLEDevice, advertisement_data: AdvertisementData
|
||||
) -> None:
|
||||
"""Inject an advertisement."""
|
||||
self._async_on_advertisement(
|
||||
device.address,
|
||||
advertisement_data.rssi,
|
||||
device.name,
|
||||
advertisement_data.service_uuids,
|
||||
advertisement_data.service_data,
|
||||
advertisement_data.manufacturer_data,
|
||||
advertisement_data.tx_power,
|
||||
)
|
||||
|
||||
new_info_callback = manager.scanner_adv_received
|
||||
connector = (
|
||||
HaBluetoothConnector(MockBleakClient, "mock_bleak_client", lambda: False),
|
||||
)
|
||||
scanner = FakeScanner(hass, "esp32", new_info_callback, connector, False)
|
||||
scanner.async_setup()
|
||||
cancel = manager.async_register_scanner(scanner, True)
|
||||
|
||||
start_time_monotonic = time.monotonic()
|
||||
scanner.inject_advertisement(switchbot_device, switchbot_device_adv)
|
||||
|
||||
devices = scanner.discovered_devices
|
||||
assert len(scanner.discovered_devices) == 1
|
||||
assert len(scanner.discovered_devices_and_advertisement_data) == 1
|
||||
assert devices[0].name == "wohand"
|
||||
|
||||
assert (
|
||||
FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
> CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
)
|
||||
|
||||
# The connectable timeout is not used for non connectable devices
|
||||
expire_monotonic = (
|
||||
start_time_monotonic
|
||||
+ CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS
|
||||
+ 1
|
||||
)
|
||||
expire_utc = dt_util.utcnow() + timedelta(
|
||||
seconds=CONNECTABLE_FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.models.MONOTONIC_TIME",
|
||||
return_value=expire_monotonic,
|
||||
):
|
||||
async_fire_time_changed(hass, expire_utc)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(scanner.discovered_devices) == 1
|
||||
assert len(scanner.discovered_devices_and_advertisement_data) == 1
|
||||
|
||||
# The non connectable timeout is used for non connectable devices
|
||||
# which is always longer than the connectable timeout
|
||||
expire_monotonic = (
|
||||
start_time_monotonic + FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
|
||||
)
|
||||
expire_utc = dt_util.utcnow() + timedelta(
|
||||
seconds=FALLBACK_MAXIMUM_STALE_ADVERTISEMENT_SECONDS + 1
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.components.bluetooth.models.MONOTONIC_TIME",
|
||||
return_value=expire_monotonic,
|
||||
):
|
||||
async_fire_time_changed(hass, expire_utc)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(scanner.discovered_devices) == 0
|
||||
assert len(scanner.discovered_devices_and_advertisement_data) == 0
|
||||
|
||||
cancel()
|
||||
|
Loading…
x
Reference in New Issue
Block a user