Improve bluetooth tracker device code (#26067)

* Improve bluetooth device tracker code

* Don't use set operations

* Fix logging template interpolation

* Warn if not tracking new devices and not devices to track

* Updates due to CR

* Fix pylint warning

* Fix pylint import warning

* Merge with dev
This commit is contained in:
Gilad Peleg 2019-09-12 19:01:55 +03:00 committed by Martin Hjelmare
parent 284ae01560
commit 25ef4a156f

View File

@ -1,25 +1,30 @@
"""Tracking for bluetooth devices.""" """Tracking for bluetooth devices."""
import logging import logging
from typing import List, Set, Tuple
# pylint: disable=import-error
import bluetooth
from bt_proximity import BluetoothRSSI
import voluptuous as vol import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import track_point_in_utc_time
from homeassistant.components.device_tracker import PLATFORM_SCHEMA from homeassistant.components.device_tracker import PLATFORM_SCHEMA
from homeassistant.components.device_tracker.const import (
CONF_SCAN_INTERVAL,
CONF_TRACK_NEW,
DEFAULT_TRACK_NEW,
DOMAIN,
SCAN_INTERVAL,
SOURCE_TYPE_BLUETOOTH,
)
from homeassistant.components.device_tracker.legacy import ( from homeassistant.components.device_tracker.legacy import (
YAML_DEVICES, YAML_DEVICES,
async_load_config, async_load_config,
) )
from homeassistant.components.device_tracker.const import ( import homeassistant.helpers.config_validation as cv
CONF_TRACK_NEW, from homeassistant.helpers.event import track_point_in_utc_time
CONF_SCAN_INTERVAL, from homeassistant.helpers.typing import HomeAssistantType
SCAN_INTERVAL,
DEFAULT_TRACK_NEW,
SOURCE_TYPE_BLUETOOTH,
DOMAIN,
)
import homeassistant.util.dt as dt_util
from homeassistant.util.async_ import run_coroutine_threadsafe from homeassistant.util.async_ import run_coroutine_threadsafe
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -42,66 +47,86 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
) )
def setup_scanner(hass, config, see, discovery_info=None): def is_bluetooth_device(device) -> bool:
"""Set up the Bluetooth Scanner.""" """Check whether a device is a bluetooth device by its mac."""
# pylint: disable=import-error return device.mac and device.mac[:3].upper() == BT_PREFIX
import bluetooth
from bt_proximity import BluetoothRSSI
def see_device(mac, name, rssi=None):
"""Mark a device as seen."""
attributes = {}
if rssi is not None:
attributes["rssi"] = rssi
see(
mac=f"{BT_PREFIX}{mac}",
host_name=name,
attributes=attributes,
source_type=SOURCE_TYPE_BLUETOOTH,
)
device_id = config.get(CONF_DEVICE_ID) def discover_devices(device_id: int) -> List[Tuple[str, str]]:
"""Discover Bluetooth devices."""
result = bluetooth.discover_devices(
duration=8,
lookup_names=True,
flush_cache=True,
lookup_class=False,
device_id=device_id,
)
_LOGGER.debug("Bluetooth devices discovered = %d", len(result))
return result
def discover_devices():
"""Discover Bluetooth devices."""
result = bluetooth.discover_devices(
duration=8,
lookup_names=True,
flush_cache=True,
lookup_class=False,
device_id=device_id,
)
_LOGGER.debug("Bluetooth devices discovered = %d", len(result))
return result
yaml_path = hass.config.path(YAML_DEVICES) def see_device(see, mac: str, device_name: str, rssi=None) -> None:
devs_to_track = [] """Mark a device as seen."""
devs_donot_track = [] attributes = {}
if rssi is not None:
attributes["rssi"] = rssi
see(
mac=f"{BT_PREFIX}{mac}",
host_name=device_name,
attributes=attributes,
source_type=SOURCE_TYPE_BLUETOOTH,
)
def get_tracking_devices(hass: HomeAssistantType) -> Tuple[Set[str], Set[str]]:
"""
Load all known devices.
We just need the devices so set consider_home and home range to 0
"""
yaml_path: str = hass.config.path(YAML_DEVICES)
devices_to_track: Set[str] = set()
devices_to_not_track: Set[str] = set()
# Load all known devices.
# We just need the devices so set consider_home and home range
# to 0
for device in run_coroutine_threadsafe( for device in run_coroutine_threadsafe(
async_load_config(yaml_path, hass, 0), hass.loop async_load_config(yaml_path, hass, 0), hass.loop
).result(): ).result():
# Check if device is a valid bluetooth device # Check if device is a valid bluetooth device
if device.mac and device.mac[:3].upper() == BT_PREFIX: if not is_bluetooth_device(device):
if device.track: continue
devs_to_track.append(device.mac[3:])
else: normalized_mac: str = device.mac[3:]
devs_donot_track.append(device.mac[3:]) if device.track:
devices_to_track.add(normalized_mac)
else:
devices_to_not_track.add(normalized_mac)
return devices_to_track, devices_to_not_track
def setup_scanner(hass: HomeAssistantType, config: dict, see, discovery_info=None):
"""Set up the Bluetooth Scanner."""
device_id: int = config.get(CONF_DEVICE_ID)
devices_to_track, devices_to_not_track = get_tracking_devices(hass)
# If track new devices is true discover new devices on startup. # If track new devices is true discover new devices on startup.
track_new = config.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW) track_new: bool = config.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW)
_LOGGER.debug("Tracking new devices = %s", track_new)
if not devices_to_track and not track_new:
_LOGGER.debug("No Bluetooth devices to track and not tracking new devices")
if track_new: if track_new:
for dev in discover_devices(): for mac, device_name in discover_devices(device_id):
if dev[0] not in devs_to_track and dev[0] not in devs_donot_track: if mac not in devices_to_track and mac not in devices_to_not_track:
devs_to_track.append(dev[0]) devices_to_track.add(mac)
see_device(dev[0], dev[1]) see_device(see, mac, device_name)
interval = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL) interval = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
request_rssi = config.get(CONF_REQUEST_RSSI, False) request_rssi = config.get(CONF_REQUEST_RSSI, False)
if request_rssi:
_LOGGER.debug("Detecting RSSI for devices")
def update_bluetooth(_): def update_bluetooth(_):
"""Update Bluetooth and set timer for the next update.""" """Update Bluetooth and set timer for the next update."""
@ -112,21 +137,22 @@ def setup_scanner(hass, config, see, discovery_info=None):
"""Lookup Bluetooth device and update status.""" """Lookup Bluetooth device and update status."""
try: try:
if track_new: if track_new:
for dev in discover_devices(): for mac, device_name in discover_devices(device_id):
if dev[0] not in devs_to_track and dev[0] not in devs_donot_track: if mac not in devices_to_track and mac not in devices_to_not_track:
devs_to_track.append(dev[0]) devices_to_track.add(mac)
for mac in devs_to_track:
for mac in devices_to_track:
_LOGGER.debug("Scanning %s", mac) _LOGGER.debug("Scanning %s", mac)
result = bluetooth.lookup_name(mac, timeout=5) device_name = bluetooth.lookup_name(mac, timeout=5)
rssi = None rssi = None
if request_rssi: if request_rssi:
client = BluetoothRSSI(mac) client = BluetoothRSSI(mac)
rssi = client.request_rssi() rssi = client.request_rssi()
client.close() client.close()
if result is None: if device_name is None:
# Could not lookup device name # Could not lookup device name
continue continue
see_device(mac, result, rssi) see_device(see, mac, device_name, rssi)
except bluetooth.BluetoothError: except bluetooth.BluetoothError:
_LOGGER.exception("Error looking up Bluetooth device") _LOGGER.exception("Error looking up Bluetooth device")