Add webhook in switchbot cloud integration (#132882)

* add webhook in switchbot cloud integration

* Rename _need_initialized to _is_initialized and reduce nb line in async_setup_entry

* Add unit tests

* Enhance poll management

* fix

---------

Co-authored-by: Joostlek <joostlek@outlook.com>
This commit is contained in:
Gigatrappeur 2025-05-22 12:19:08 +02:00 committed by GitHub
parent d870410413
commit c68e663a1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 257 additions and 13 deletions

View File

@ -1,17 +1,21 @@
"""SwitchBot via API integration."""
from asyncio import gather
from collections.abc import Awaitable, Callable
import contextlib
from dataclasses import dataclass, field
from logging import getLogger
from aiohttp import web
from switchbot_api import CannotConnect, Device, InvalidAuth, Remote, SwitchBotAPI
from homeassistant.components import webhook
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_KEY, CONF_API_TOKEN, Platform
from homeassistant.const import CONF_API_KEY, CONF_API_TOKEN, CONF_WEBHOOK_ID, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from .const import DOMAIN
from .const import DOMAIN, ENTRY_TITLE
from .coordinator import SwitchBotCoordinator
_LOGGER = getLogger(__name__)
@ -30,13 +34,17 @@ PLATFORMS: list[Platform] = [
class SwitchbotDevices:
"""Switchbot devices data."""
binary_sensors: list[Device] = field(default_factory=list)
buttons: list[Device] = field(default_factory=list)
climates: list[Remote] = field(default_factory=list)
switches: list[Device | Remote] = field(default_factory=list)
sensors: list[Device] = field(default_factory=list)
vacuums: list[Device] = field(default_factory=list)
locks: list[Device] = field(default_factory=list)
binary_sensors: list[tuple[Device, SwitchBotCoordinator]] = field(
default_factory=list
)
buttons: list[tuple[Device, SwitchBotCoordinator]] = field(default_factory=list)
climates: list[tuple[Remote, SwitchBotCoordinator]] = field(default_factory=list)
switches: list[tuple[Device | Remote, SwitchBotCoordinator]] = field(
default_factory=list
)
sensors: list[tuple[Device, SwitchBotCoordinator]] = field(default_factory=list)
vacuums: list[tuple[Device, SwitchBotCoordinator]] = field(default_factory=list)
locks: list[tuple[Device, SwitchBotCoordinator]] = field(default_factory=list)
@dataclass
@ -53,10 +61,12 @@ async def coordinator_for_device(
api: SwitchBotAPI,
device: Device | Remote,
coordinators_by_id: dict[str, SwitchBotCoordinator],
manageable_by_webhook: bool = False,
) -> SwitchBotCoordinator:
"""Instantiate coordinator and adds to list for gathering."""
coordinator = coordinators_by_id.setdefault(
device.device_id, SwitchBotCoordinator(hass, entry, api, device)
device.device_id,
SwitchBotCoordinator(hass, entry, api, device, manageable_by_webhook),
)
if coordinator.data is None:
@ -133,7 +143,7 @@ async def make_device_data(
"Robot Vacuum Cleaner S1 Plus",
]:
coordinator = await coordinator_for_device(
hass, entry, api, device, coordinators_by_id
hass, entry, api, device, coordinators_by_id, True
)
devices_data.vacuums.append((device, coordinator))
@ -182,7 +192,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN][entry.entry_id] = SwitchbotCloudData(
api=api, devices=switchbot_devices
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
await _initialize_webhook(hass, entry, api, coordinators_by_id)
return True
@ -192,3 +206,120 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def _initialize_webhook(
hass: HomeAssistant,
entry: ConfigEntry,
api: SwitchBotAPI,
coordinators_by_id: dict[str, SwitchBotCoordinator],
) -> None:
"""Initialize webhook if needed."""
if any(
coordinator.manageable_by_webhook()
for coordinator in coordinators_by_id.values()
):
if CONF_WEBHOOK_ID not in entry.data:
new_data = entry.data.copy()
if CONF_WEBHOOK_ID not in new_data:
# create new id and new conf
new_data[CONF_WEBHOOK_ID] = webhook.async_generate_id()
hass.config_entries.async_update_entry(entry, data=new_data)
# register webhook
webhook_name = ENTRY_TITLE
if entry.title != ENTRY_TITLE:
webhook_name = f"{ENTRY_TITLE} {entry.title}"
with contextlib.suppress(Exception):
webhook.async_register(
hass,
DOMAIN,
webhook_name,
entry.data[CONF_WEBHOOK_ID],
_create_handle_webhook(coordinators_by_id),
)
webhook_url = webhook.async_generate_url(
hass,
entry.data[CONF_WEBHOOK_ID],
)
# check if webhook is configured in switchbot cloud
check_webhook_result = None
with contextlib.suppress(Exception):
check_webhook_result = await api.get_webook_configuration()
actual_webhook_urls = (
check_webhook_result["urls"]
if check_webhook_result and "urls" in check_webhook_result
else []
)
need_add_webhook = (
len(actual_webhook_urls) == 0 or webhook_url not in actual_webhook_urls
)
need_clean_previous_webhook = (
len(actual_webhook_urls) > 0 and webhook_url not in actual_webhook_urls
)
if need_clean_previous_webhook:
# it seems is impossible to register multiple webhook.
# So, if webhook already exists, we delete it
await api.delete_webhook(actual_webhook_urls[0])
_LOGGER.debug(
"Deleted previous Switchbot cloud webhook url: %s",
actual_webhook_urls[0],
)
if need_add_webhook:
# call api for register webhookurl
await api.setup_webhook(webhook_url)
_LOGGER.debug("Registered Switchbot cloud webhook at hass: %s", webhook_url)
for coordinator in coordinators_by_id.values():
coordinator.webhook_subscription_listener(True)
_LOGGER.debug("Registered Switchbot cloud webhook at: %s", webhook_url)
def _create_handle_webhook(
coordinators_by_id: dict[str, SwitchBotCoordinator],
) -> Callable[[HomeAssistant, str, web.Request], Awaitable[None]]:
"""Create a webhook handler."""
async def _internal_handle_webhook(
hass: HomeAssistant, webhook_id: str, request: web.Request
) -> None:
"""Handle webhook callback."""
if not request.body_exists:
_LOGGER.debug("Received invalid request from switchbot webhook")
return
data = await request.json()
# Structure validation
if (
not isinstance(data, dict)
or "eventType" not in data
or data["eventType"] != "changeReport"
or "eventVersion" not in data
or data["eventVersion"] != "1"
or "context" not in data
or not isinstance(data["context"], dict)
or "deviceType" not in data["context"]
or "deviceMac" not in data["context"]
):
_LOGGER.debug("Received invalid data from switchbot webhook %s", repr(data))
return
deviceMac = data["context"]["deviceMac"]
if deviceMac not in coordinators_by_id:
_LOGGER.error(
"Received data for unknown entity from switchbot webhook: %s", data
)
return
coordinators_by_id[deviceMac].async_set_updated_data(data["context"])
return _internal_handle_webhook

View File

@ -23,6 +23,8 @@ class SwitchBotCoordinator(DataUpdateCoordinator[Status]):
config_entry: ConfigEntry
_api: SwitchBotAPI
_device_id: str
_manageable_by_webhook: bool
_webhooks_connected: bool = False
def __init__(
self,
@ -30,6 +32,7 @@ class SwitchBotCoordinator(DataUpdateCoordinator[Status]):
config_entry: ConfigEntry,
api: SwitchBotAPI,
device: Device | Remote,
manageable_by_webhook: bool,
) -> None:
"""Initialize SwitchBot Cloud."""
super().__init__(
@ -42,6 +45,20 @@ class SwitchBotCoordinator(DataUpdateCoordinator[Status]):
self._api = api
self._device_id = device.device_id
self._should_poll = not isinstance(device, Remote)
self._manageable_by_webhook = manageable_by_webhook
def webhook_subscription_listener(self, connected: bool) -> None:
"""Call when webhook status changed."""
if self._manageable_by_webhook:
self._webhooks_connected = connected
if connected:
self.update_interval = None
else:
self.update_interval = DEFAULT_SCAN_INTERVAL
def manageable_by_webhook(self) -> bool:
"""Return update_by_webhook value."""
return self._manageable_by_webhook
async def _async_update_data(self) -> Status:
"""Fetch data from API endpoint."""

View File

@ -3,6 +3,7 @@
"name": "SwitchBot Cloud",
"codeowners": ["@SeraphicRav", "@laurence-presland", "@Gigatrappeur"],
"config_flow": true,
"dependencies": ["webhook"],
"documentation": "https://www.home-assistant.io/integrations/switchbot_cloud",
"integration_type": "hub",
"iot_class": "cloud_polling",

View File

@ -7,11 +7,14 @@ from switchbot_api import CannotConnect, Device, InvalidAuth, PowerState, Remote
from homeassistant.components.switchbot_cloud import SwitchBotAPI
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import EVENT_HOMEASSISTANT_START
from homeassistant.const import CONF_WEBHOOK_ID, EVENT_HOMEASSISTANT_START
from homeassistant.core import HomeAssistant
from homeassistant.core_config import async_process_ha_core_config
from . import configure_integration
from tests.typing import ClientSessionGenerator
@pytest.fixture
def mock_list_devices():
@ -27,10 +30,43 @@ def mock_get_status():
yield mock_get_status
@pytest.fixture
def mock_get_webook_configuration():
"""Mock get_status."""
with patch.object(
SwitchBotAPI, "get_webook_configuration"
) as mock_get_webook_configuration:
yield mock_get_webook_configuration
@pytest.fixture
def mock_delete_webhook():
"""Mock get_status."""
with patch.object(SwitchBotAPI, "delete_webhook") as mock_delete_webhook:
yield mock_delete_webhook
@pytest.fixture
def mock_setup_webhook():
"""Mock get_status."""
with patch.object(SwitchBotAPI, "setup_webhook") as mock_setup_webhook:
yield mock_setup_webhook
async def test_setup_entry_success(
hass: HomeAssistant, mock_list_devices, mock_get_status
hass: HomeAssistant,
mock_list_devices,
mock_get_status,
mock_get_webook_configuration,
mock_delete_webhook,
mock_setup_webhook,
) -> None:
"""Test successful setup of entry."""
await async_process_ha_core_config(
hass,
{"external_url": "https://example.com"},
)
mock_get_webook_configuration.return_value = {"urls": ["https://example.com"]}
mock_list_devices.return_value = [
Remote(
version="V1.0",
@ -67,8 +103,15 @@ async def test_setup_entry_success(
deviceType="Hub 2",
hubDeviceId="test-hub-id",
),
Device(
deviceId="vacuum-1",
deviceName="vacuum-name-1",
deviceType="K10+",
hubDeviceId=None,
),
]
mock_get_status.return_value = {"power": PowerState.ON.value}
entry = await configure_integration(hass)
assert entry.state is ConfigEntryState.LOADED
@ -76,6 +119,9 @@ async def test_setup_entry_success(
await hass.async_block_till_done()
mock_list_devices.assert_called_once()
mock_get_status.assert_called()
mock_get_webook_configuration.assert_called_once()
mock_delete_webhook.assert_called_once()
mock_setup_webhook.assert_called_once()
@pytest.mark.parametrize(
@ -124,3 +170,52 @@ async def test_setup_entry_fails_when_refreshing(
await hass.async_block_till_done()
mock_list_devices.assert_called_once()
mock_get_status.assert_called()
async def test_posting_to_webhook(
hass: HomeAssistant,
mock_list_devices,
mock_get_status,
mock_get_webook_configuration,
mock_delete_webhook,
mock_setup_webhook,
hass_client_no_auth: ClientSessionGenerator,
) -> None:
"""Test handler webhook call."""
await async_process_ha_core_config(
hass,
{"external_url": "https://example.com"},
)
mock_get_webook_configuration.return_value = {"urls": ["https://example.com"]}
mock_list_devices.return_value = [
Device(
deviceId="vacuum-1",
deviceName="vacuum-name-1",
deviceType="K10+",
hubDeviceId=None,
),
]
mock_get_status.return_value = {"power": PowerState.ON.value}
mock_delete_webhook.return_value = {}
mock_setup_webhook.return_value = {}
entry = await configure_integration(hass)
assert entry.state is ConfigEntryState.LOADED
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
webhook_id = entry.data[CONF_WEBHOOK_ID]
client = await hass_client_no_auth()
# fire webhook
await client.post(
f"/api/webhook/{webhook_id}",
json={
"eventType": "changeReport",
"eventVersion": "1",
"context": {"deviceType": "...", "deviceMac": "vacuum-1"},
},
)
await hass.async_block_till_done()
mock_setup_webhook.assert_called_once()