Add bluetooth discovery to HomeKit Controller (#75333)

Co-authored-by: Jc2k <john.carr@unrouted.co.uk>
This commit is contained in:
J. Nick Koston
2022-07-17 08:19:05 -05:00
committed by GitHub
parent 503b31fb15
commit 8d63f81821
8 changed files with 268 additions and 55 deletions

View File

@@ -1,13 +1,17 @@
"""Config flow to configure homekit_controller."""
from __future__ import annotations
from collections.abc import Awaitable
import logging
import re
from typing import Any
from typing import TYPE_CHECKING, Any, cast
import aiohomekit
from aiohomekit.controller.abstract import AbstractPairing
from aiohomekit import Controller, const as aiohomekit_const
from aiohomekit.controller.abstract import AbstractDiscovery, AbstractPairing
from aiohomekit.exceptions import AuthenticationError
from aiohomekit.model.categories import Categories
from aiohomekit.model.status_flags import StatusFlags
from aiohomekit.utils import domain_supported, domain_to_name
import voluptuous as vol
@@ -16,6 +20,7 @@ from homeassistant.components import zeroconf
from homeassistant.core import callback
from homeassistant.data_entry_flow import AbortFlow, FlowResult
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.service_info import bluetooth
from .connection import HKDevice
from .const import DOMAIN, KNOWN_DEVICES
@@ -41,6 +46,8 @@ PIN_FORMAT = re.compile(r"^(\d{3})-{0,1}(\d{2})-{0,1}(\d{3})$")
_LOGGER = logging.getLogger(__name__)
BLE_DEFAULT_NAME = "Bluetooth device"
INSECURE_CODES = {
"00000000",
"11111111",
@@ -62,6 +69,11 @@ def normalize_hkid(hkid: str) -> str:
return hkid.lower()
def formatted_category(category: Categories) -> str:
"""Return a human readable category name."""
return str(category.name).replace("_", " ").title()
@callback
def find_existing_host(hass, serial: str) -> config_entries.ConfigEntry | None:
"""Return a set of the configured hosts."""
@@ -92,14 +104,15 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
def __init__(self):
def __init__(self) -> None:
"""Initialize the homekit_controller flow."""
self.model = None
self.hkid = None
self.name = None
self.devices = {}
self.controller = None
self.finish_pairing = None
self.model: str | None = None
self.hkid: str | None = None
self.name: str | None = None
self.category: Categories | None = None
self.devices: dict[str, AbstractDiscovery] = {}
self.controller: Controller | None = None
self.finish_pairing: Awaitable[AbstractPairing] | None = None
async def _async_setup_controller(self):
"""Create the controller."""
@@ -111,9 +124,11 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
key = user_input["device"]
self.hkid = self.devices[key].description.id
self.model = self.devices[key].description.model
self.name = self.devices[key].description.name
discovery = self.devices[key]
self.category = discovery.description.category
self.hkid = discovery.description.id
self.model = getattr(discovery.description, "model", BLE_DEFAULT_NAME)
self.name = discovery.description.name or BLE_DEFAULT_NAME
await self.async_set_unique_id(
normalize_hkid(self.hkid), raise_on_progress=False
@@ -138,7 +153,14 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
step_id="user",
errors=errors,
data_schema=vol.Schema(
{vol.Required("device"): vol.In(self.devices.keys())}
{
vol.Required("device"): vol.In(
{
key: f"{key} ({formatted_category(discovery.description.category)})"
for key, discovery in self.devices.items()
}
)
}
),
)
@@ -151,13 +173,14 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
await self._async_setup_controller()
try:
device = await self.controller.async_find(unique_id)
discovery = await self.controller.async_find(unique_id)
except aiohomekit.AccessoryNotFoundError:
return self.async_abort(reason="accessory_not_found_error")
self.name = device.description.name
self.model = device.description.model
self.hkid = device.description.id
self.name = discovery.description.name
self.model = discovery.description.model
self.category = discovery.description.category
self.hkid = discovery.description.id
return self._async_step_pair_show_form()
@@ -213,6 +236,7 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
model = properties["md"]
name = domain_to_name(discovery_info.name)
status_flags = int(properties["sf"])
category = Categories(int(properties.get("ci", 0)))
paired = not status_flags & 0x01
# The configuration number increases every time the characteristic map
@@ -326,6 +350,7 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
self.name = name
self.model = model
self.category = category
self.hkid = hkid
# We want to show the pairing form - but don't call async_step_pair
@@ -333,6 +358,55 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
# pairing code)
return self._async_step_pair_show_form()
async def async_step_bluetooth(
self, discovery_info: bluetooth.BluetoothServiceInfo
) -> FlowResult:
"""Handle the bluetooth discovery step."""
if not aiohomekit_const.BLE_TRANSPORT_SUPPORTED:
return self.async_abort(reason="ignored_model")
# Late imports in case BLE is not available
from aiohomekit.controller.ble.discovery import ( # pylint: disable=import-outside-toplevel
BleDiscovery,
)
from aiohomekit.controller.ble.manufacturer_data import ( # pylint: disable=import-outside-toplevel
HomeKitAdvertisement,
)
await self.async_set_unique_id(discovery_info.address)
self._abort_if_unique_id_configured()
mfr_data = discovery_info.manufacturer_data
try:
device = HomeKitAdvertisement.from_manufacturer_data(
discovery_info.name, discovery_info.address, mfr_data
)
except ValueError:
return self.async_abort(reason="ignored_model")
if not (device.status_flags & StatusFlags.UNPAIRED):
return self.async_abort(reason="already_paired")
if self.controller is None:
await self._async_setup_controller()
assert self.controller is not None
try:
discovery = await self.controller.async_find(device.id)
except aiohomekit.AccessoryNotFoundError:
return self.async_abort(reason="accessory_not_found_error")
if TYPE_CHECKING:
discovery = cast(BleDiscovery, discovery)
self.name = discovery.description.name
self.model = BLE_DEFAULT_NAME
self.category = discovery.description.category
self.hkid = discovery.description.id
return self._async_step_pair_show_form()
async def async_step_pair(self, pair_info=None):
"""Pair with a new HomeKit accessory."""
# If async_step_pair is called with no pairing code then we do the M1
@@ -453,8 +527,10 @@ class HomekitControllerFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
@callback
def _async_step_pair_show_form(self, errors=None):
placeholders = {"name": self.name}
self.context["title_placeholders"] = {"name": self.name}
placeholders = self.context["title_placeholders"] = {
"name": self.name,
"category": formatted_category(self.category),
}
schema = {vol.Required("pairing_code"): vol.All(str, vol.Strip)}
if errors and errors.get("pairing_code") == "insecure_setup_code":