Add WebSocket API to ssdp to observe discovery (#143862)

This commit is contained in:
J. Nick Koston 2025-04-29 21:03:53 +02:00 committed by GitHub
parent 08fe6653bb
commit 89abc5ac69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 226 additions and 17 deletions

View File

@ -36,6 +36,7 @@ from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_ssdp, bind_hass
from homeassistant.util.logging import catch_log_exception
from . import websocket_api
from .const import DOMAIN, SSDP_SCANNER, UPNP_SERVER
from .scanner import (
IntegrationMatchers,
@ -213,6 +214,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
await scanner.async_start()
await server.async_start()
websocket_api.async_setup(hass)
return True

View File

@ -0,0 +1,60 @@
"""The ssdp integration websocket apis."""
from __future__ import annotations
from dataclasses import asdict
from typing import Any, Final
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HassJob, HomeAssistant, callback
from homeassistant.helpers.json import json_bytes
from homeassistant.helpers.service_info.ssdp import SsdpServiceInfo
from .const import DOMAIN, SSDP_SCANNER
from .scanner import Scanner, SsdpChange
FIELD_SSDP_ST: Final = "ssdp_st"
FIELD_SSDP_LOCATION: Final = "ssdp_location"
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Set up the ssdp websocket API."""
websocket_api.async_register_command(hass, ws_subscribe_discovery)
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required("type"): "ssdp/subscribe_discovery",
}
)
@websocket_api.async_response
async def ws_subscribe_discovery(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
) -> None:
"""Handle subscribe advertisements websocket command."""
scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
msg_id: int = msg["id"]
def _async_event_message(message: dict[str, Any]) -> None:
connection.send_message(
json_bytes(websocket_api.event_message(msg_id, message))
)
@callback
def _async_on_data(info: SsdpServiceInfo, change: SsdpChange) -> None:
if change is not SsdpChange.BYEBYE:
_async_event_message({"add": [asdict(info)]})
return
remove_msg = {
FIELD_SSDP_ST: info.ssdp_st,
FIELD_SSDP_LOCATION: info.ssdp_location,
}
_async_event_message({"remove": [remove_msg]})
job = HassJob(_async_on_data)
connection.send_message(json_bytes(websocket_api.result_message(msg_id)))
connection.subscriptions[msg_id] = await scanner.async_register_callback(job, None)

View File

@ -1 +1,27 @@
"""Tests for the SSDP integration."""
from __future__ import annotations
from datetime import datetime
from async_upnp_client.ssdp import udn_from_headers
from async_upnp_client.ssdp_listener import SsdpListener
from async_upnp_client.utils import CaseInsensitiveDict
from homeassistant.components import ssdp
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
async def init_ssdp_component(hass: HomeAssistant) -> SsdpListener:
"""Initialize ssdp component and get SsdpListener."""
await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
await hass.async_block_till_done()
return hass.data[ssdp.DOMAIN][ssdp.SSDP_SCANNER]._ssdp_listeners[0]
def _ssdp_headers(headers) -> CaseInsensitiveDict:
"""Create a CaseInsensitiveDict with headers and a timestamp."""
ssdp_headers = CaseInsensitiveDict(headers, _timestamp=datetime.now())
ssdp_headers["_udn"] = udn_from_headers(ssdp_headers)
return ssdp_headers

View File

@ -1,14 +1,11 @@
"""Test the SSDP integration."""
from datetime import datetime
from ipaddress import IPv4Address
from typing import Any
from unittest.mock import ANY, AsyncMock, patch
from async_upnp_client.server import UpnpServer
from async_upnp_client.ssdp import udn_from_headers
from async_upnp_client.ssdp_listener import SsdpListener
from async_upnp_client.utils import CaseInsensitiveDict
import pytest
from homeassistant import config_entries
@ -39,9 +36,10 @@ from homeassistant.helpers.service_info.ssdp import (
ATTR_UPNP_UPC,
SsdpServiceInfo,
)
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import _ssdp_headers, init_ssdp_component
from tests.common import (
MockConfigEntry,
MockModule,
@ -52,19 +50,6 @@ from tests.common import (
from tests.test_util.aiohttp import AiohttpClientMocker
def _ssdp_headers(headers):
ssdp_headers = CaseInsensitiveDict(headers, _timestamp=datetime.now())
ssdp_headers["_udn"] = udn_from_headers(ssdp_headers)
return ssdp_headers
async def init_ssdp_component(hass: HomeAssistant) -> SsdpListener:
"""Initialize ssdp component and get SsdpListener."""
await async_setup_component(hass, ssdp.DOMAIN, {ssdp.DOMAIN: {}})
await hass.async_block_till_done()
return hass.data[ssdp.DOMAIN][ssdp.SSDP_SCANNER]._ssdp_listeners[0]
@patch(
"homeassistant.components.ssdp.async_get_ssdp",
return_value={"mock-domain": [{"st": "mock-st"}]},

View File

@ -0,0 +1,136 @@
"""The tests for the ssdp WebSocket API."""
import asyncio
from unittest.mock import ANY, AsyncMock, Mock, patch
from homeassistant.core import EVENT_HOMEASSISTANT_STARTED, HomeAssistant
from . import _ssdp_headers, init_ssdp_component
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import WebSocketGenerator
@patch(
"homeassistant.components.ssdp.async_get_ssdp",
return_value={"mock-domain": [{"deviceType": "Paulus"}]},
)
async def test_subscribe_discovery(
mock_get_ssdp: Mock,
hass: HomeAssistant,
aioclient_mock: AiohttpClientMocker,
mock_flow_init: AsyncMock,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test ssdp subscribe_discovery."""
aioclient_mock.get(
"http://1.1.1.1",
text="""
<root>
<device>
<deviceType>Paulus</deviceType>
</device>
</root>
""",
)
ssdp_listener = await init_ssdp_component(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await hass.async_block_till_done()
mock_ssdp_search_response = _ssdp_headers(
{
"st": "mock-st",
"location": "http://1.1.1.1",
"usn": "uuid:mock-udn::mock-st",
"_source": "search",
}
)
ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done(wait_background_tasks=True)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "ssdp/subscribe_discovery",
}
)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["success"]
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"]["add"] == [
{
"ssdp_all_locations": ["http://1.1.1.1"],
"ssdp_ext": None,
"ssdp_headers": {
"_source": "search",
"_timestamp": ANY,
"_udn": "uuid:mock-udn",
"location": "http://1.1.1.1",
"st": "mock-st",
"usn": "uuid:mock-udn::mock-st",
},
"ssdp_location": "http://1.1.1.1",
"ssdp_nt": None,
"ssdp_server": None,
"ssdp_st": "mock-st",
"ssdp_udn": "uuid:mock-udn",
"ssdp_usn": "uuid:mock-udn::mock-st",
"upnp": {"UDN": "uuid:mock-udn", "deviceType": "Paulus"},
"x_homeassistant_matching_domains": [],
}
]
mock_ssdp_advertisement = _ssdp_headers(
{
"location": "http://1.1.1.1",
"usn": "uuid:mock-udn::mock-st",
"nt": "upnp:rootdevice",
"nts": "ssdp:alive",
"_source": "advertisement",
}
)
ssdp_listener._on_alive(mock_ssdp_advertisement)
await hass.async_block_till_done(wait_background_tasks=True)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"]["add"] == [
{
"ssdp_all_locations": ["http://1.1.1.1"],
"ssdp_ext": None,
"ssdp_headers": {
"_source": "advertisement",
"_timestamp": ANY,
"_udn": "uuid:mock-udn",
"location": "http://1.1.1.1",
"nt": "upnp:rootdevice",
"nts": "ssdp:alive",
"usn": "uuid:mock-udn::mock-st",
},
"ssdp_location": "http://1.1.1.1",
"ssdp_nt": "upnp:rootdevice",
"ssdp_server": None,
"ssdp_st": "upnp:rootdevice",
"ssdp_udn": "uuid:mock-udn",
"ssdp_usn": "uuid:mock-udn::mock-st",
"upnp": {"UDN": "uuid:mock-udn", "deviceType": "Paulus"},
"x_homeassistant_matching_domains": ["mock-domain"],
}
]
mock_ssdp_advertisement["nts"] = "ssdp:byebye"
ssdp_listener._on_byebye(mock_ssdp_advertisement)
await hass.async_block_till_done(wait_background_tasks=True)
async with asyncio.timeout(1):
response = await client.receive_json()
assert response["event"]["remove"] == [
{"ssdp_location": "http://1.1.1.1", "ssdp_st": "upnp:rootdevice"}
]