mirror of
https://github.com/home-assistant/core.git
synced 2025-11-10 11:29:46 +00:00
149 lines
4.8 KiB
Python
149 lines
4.8 KiB
Python
"""Coordinator for the ToGrill Bluetooth integration."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
import logging
|
|
|
|
from bleak.exc import BleakError
|
|
from togrill_bluetooth.client import Client
|
|
from togrill_bluetooth.exceptions import DecodeError
|
|
from togrill_bluetooth.packets import Packet, PacketA0Notify, PacketA1Notify
|
|
|
|
from homeassistant.components import bluetooth
|
|
from homeassistant.components.bluetooth import (
|
|
BluetoothCallbackMatcher,
|
|
BluetoothChange,
|
|
BluetoothScanningMode,
|
|
BluetoothServiceInfoBleak,
|
|
async_register_callback,
|
|
)
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.const import CONF_ADDRESS, CONF_MODEL
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
|
|
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
|
|
|
type ToGrillConfigEntry = ConfigEntry[ToGrillCoordinator]
|
|
|
|
SCAN_INTERVAL = timedelta(seconds=30)
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def get_version_string(packet: PacketA0Notify) -> str:
|
|
"""Construct a version string from packet data."""
|
|
return f"{packet.version_major}.{packet.version_minor}"
|
|
|
|
|
|
class DeviceNotFound(UpdateFailed):
|
|
"""Update failed due to device disconnected."""
|
|
|
|
|
|
class DeviceFailed(UpdateFailed):
|
|
"""Update failed due to device disconnected."""
|
|
|
|
|
|
class ToGrillCoordinator(DataUpdateCoordinator[dict[int, Packet]]):
|
|
"""Class to manage fetching data."""
|
|
|
|
config_entry: ToGrillConfigEntry
|
|
client: Client | None = None
|
|
|
|
def __init__(
|
|
self,
|
|
hass: HomeAssistant,
|
|
config_entry: ToGrillConfigEntry,
|
|
) -> None:
|
|
"""Initialize global data updater."""
|
|
super().__init__(
|
|
hass=hass,
|
|
logger=LOGGER,
|
|
config_entry=config_entry,
|
|
name="ToGrill",
|
|
update_interval=SCAN_INTERVAL,
|
|
)
|
|
self.address = config_entry.data[CONF_ADDRESS]
|
|
self.data = {}
|
|
self.device_info = DeviceInfo(
|
|
connections={(CONNECTION_BLUETOOTH, self.address)}
|
|
)
|
|
|
|
config_entry.async_on_unload(
|
|
async_register_callback(
|
|
hass,
|
|
self._async_handle_bluetooth_event,
|
|
BluetoothCallbackMatcher(address=self.address, connectable=True),
|
|
BluetoothScanningMode.ACTIVE,
|
|
)
|
|
)
|
|
|
|
async def _connect_and_update_registry(self) -> Client:
|
|
"""Update device registry data."""
|
|
device = bluetooth.async_ble_device_from_address(
|
|
self.hass, self.address, connectable=True
|
|
)
|
|
if not device:
|
|
raise DeviceNotFound("Unable to find device")
|
|
|
|
client = await Client.connect(device, self._notify_callback)
|
|
try:
|
|
packet_a0 = await client.read(PacketA0Notify)
|
|
except (BleakError, DecodeError) as exc:
|
|
await client.disconnect()
|
|
raise DeviceFailed(f"Device failed {exc}") from exc
|
|
|
|
config_entry = self.config_entry
|
|
|
|
device_registry = dr.async_get(self.hass)
|
|
device_registry.async_get_or_create(
|
|
config_entry_id=config_entry.entry_id,
|
|
connections={(CONNECTION_BLUETOOTH, self.address)},
|
|
name=config_entry.data[CONF_MODEL],
|
|
model_id=config_entry.data[CONF_MODEL],
|
|
sw_version=get_version_string(packet_a0),
|
|
)
|
|
|
|
return client
|
|
|
|
async def async_shutdown(self) -> None:
|
|
"""Shutdown coordinator and disconnect from device."""
|
|
await super().async_shutdown()
|
|
if self.client:
|
|
await self.client.disconnect()
|
|
self.client = None
|
|
|
|
async def _get_connected_client(self) -> Client:
|
|
if self.client and not self.client.is_connected:
|
|
await self.client.disconnect()
|
|
self.client = None
|
|
if self.client:
|
|
return self.client
|
|
|
|
self.client = await self._connect_and_update_registry()
|
|
return self.client
|
|
|
|
def _notify_callback(self, packet: Packet):
|
|
self.data[packet.type] = packet
|
|
self.async_update_listeners()
|
|
|
|
async def _async_update_data(self) -> dict[int, Packet]:
|
|
"""Poll the device."""
|
|
client = await self._get_connected_client()
|
|
try:
|
|
await client.request(PacketA0Notify)
|
|
await client.request(PacketA1Notify)
|
|
except BleakError as exc:
|
|
raise DeviceFailed(f"Device failed {exc}") from exc
|
|
return self.data
|
|
|
|
@callback
|
|
def _async_handle_bluetooth_event(
|
|
self,
|
|
service_info: BluetoothServiceInfoBleak,
|
|
change: BluetoothChange,
|
|
) -> None:
|
|
"""Handle a Bluetooth event."""
|
|
if not self.client and isinstance(self.last_exception, DeviceNotFound):
|
|
self.hass.async_create_task(self.async_refresh())
|