Make emulated_hue upnp responder async (#39126)

This commit is contained in:
J. Nick Koston 2020-08-23 02:50:59 -05:00 committed by GitHub
parent dfa18b4b6a
commit 3cc099af80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 139 deletions

View File

@ -220,7 +220,6 @@ omit =
homeassistant/components/emby/media_player.py homeassistant/components/emby/media_player.py
homeassistant/components/emoncms/sensor.py homeassistant/components/emoncms/sensor.py
homeassistant/components/emoncms_history/* homeassistant/components/emoncms_history/*
homeassistant/components/emulated_hue/upnp.py
homeassistant/components/enigma2/media_player.py homeassistant/components/enigma2/media_player.py
homeassistant/components/enocean/__init__.py homeassistant/components/enocean/__init__.py
homeassistant/components/enocean/binary_sensor.py homeassistant/components/enocean/binary_sensor.py

View File

@ -21,7 +21,7 @@ from .hue_api import (
HueUnauthorizedUser, HueUnauthorizedUser,
HueUsernameView, HueUsernameView,
) )
from .upnp import DescriptionXmlView, UPNPResponderThread from .upnp import DescriptionXmlView, create_upnp_datagram_endpoint
DOMAIN = "emulated_hue" DOMAIN = "emulated_hue"
@ -120,17 +120,22 @@ async def async_setup(hass, yaml_config):
HueGroupView(config).register(app, app.router) HueGroupView(config).register(app, app.router)
HueFullStateView(config).register(app, app.router) HueFullStateView(config).register(app, app.router)
upnp_listener = UPNPResponderThread( listen = create_upnp_datagram_endpoint(
config.host_ip_addr, config.host_ip_addr,
config.listen_port,
config.upnp_bind_multicast, config.upnp_bind_multicast,
config.advertise_ip, config.advertise_ip,
config.advertise_port, config.advertise_port or config.listen_port,
) )
protocol = None
async def stop_emulated_hue_bridge(event): async def stop_emulated_hue_bridge(event):
"""Stop the emulated hue bridge.""" """Stop the emulated hue bridge."""
upnp_listener.stop() nonlocal protocol
nonlocal site
nonlocal runner
if protocol:
protocol.close()
if site: if site:
await site.stop() await site.stop()
if runner: if runner:
@ -138,10 +143,12 @@ async def async_setup(hass, yaml_config):
async def start_emulated_hue_bridge(event): async def start_emulated_hue_bridge(event):
"""Start the emulated hue bridge.""" """Start the emulated hue bridge."""
upnp_listener.start() nonlocal protocol
nonlocal site nonlocal site
nonlocal runner nonlocal runner
_, protocol = await listen
runner = web.AppRunner(app) runner = web.AppRunner(app)
await runner.setup() await runner.setup()
@ -153,6 +160,8 @@ async def async_setup(hass, yaml_config):
_LOGGER.error( _LOGGER.error(
"Failed to create HTTP server at port %d: %s", config.listen_port, error "Failed to create HTTP server at port %d: %s", config.listen_port, error
) )
if protocol:
protocol.close()
else: else:
hass.bus.async_listen_once( hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, stop_emulated_hue_bridge EVENT_HOMEASSISTANT_STOP, stop_emulated_hue_bridge

View File

@ -1,8 +1,7 @@
"""Support UPNP discovery method that mimics Hue hubs.""" """Support UPNP discovery method that mimics Hue hubs."""
import asyncio
import logging import logging
import select
import socket import socket
import threading
from aiohttp import web from aiohttp import web
@ -13,6 +12,9 @@ from .const import HUE_SERIAL_NUMBER, HUE_UUID
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
BROADCAST_PORT = 1900
BROADCAST_ADDR = "239.255.255.250"
class DescriptionXmlView(HomeAssistantView): class DescriptionXmlView(HomeAssistantView):
"""Handles requests for the description.xml file.""" """Handles requests for the description.xml file."""
@ -53,106 +55,95 @@ class DescriptionXmlView(HomeAssistantView):
return web.Response(text=resp_text, content_type="text/xml") return web.Response(text=resp_text, content_type="text/xml")
class UPNPResponderThread(threading.Thread): @core.callback
def create_upnp_datagram_endpoint(
host_ip_addr, upnp_bind_multicast, advertise_ip, advertise_port,
):
"""Create the UPNP socket and protocol."""
# Listen for UDP port 1900 packets sent to SSDP multicast address
ssdp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ssdp_socket.setblocking(False)
# Required for receiving multicast
ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssdp_socket.setsockopt(
socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(host_ip_addr)
)
ssdp_socket.setsockopt(
socket.SOL_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(BROADCAST_ADDR) + socket.inet_aton(host_ip_addr),
)
ssdp_socket.bind(("" if upnp_bind_multicast else host_ip_addr, BROADCAST_PORT))
loop = asyncio.get_event_loop()
return loop.create_datagram_endpoint(
lambda: UPNPResponderProtocol(loop, ssdp_socket, advertise_ip, advertise_port),
sock=ssdp_socket,
)
class UPNPResponderProtocol:
"""Handle responding to UPNP/SSDP discovery requests.""" """Handle responding to UPNP/SSDP discovery requests."""
_interrupted = False def __init__(self, loop, ssdp_socket, advertise_ip, advertise_port):
def __init__(
self,
host_ip_addr,
listen_port,
upnp_bind_multicast,
advertise_ip,
advertise_port,
):
"""Initialize the class.""" """Initialize the class."""
threading.Thread.__init__(self) self.transport = None
self._loop = loop
self.host_ip_addr = host_ip_addr self._sock = ssdp_socket
self.listen_port = listen_port
self.upnp_bind_multicast = upnp_bind_multicast
self.advertise_ip = advertise_ip self.advertise_ip = advertise_ip
self.advertise_port = advertise_port self.advertise_port = advertise_port
self._ssdp_socket = None self._upnp_root_response = self._prepare_response(
"upnp:rootdevice", f"uuid:{HUE_UUID}::upnp:rootdevice"
def run(self):
"""Run the server."""
# Listen for UDP port 1900 packets sent to SSDP multicast address
self._ssdp_socket = ssdp_socket = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM
) )
ssdp_socket.setblocking(False) self._upnp_device_response = self._prepare_response(
# Required for receiving multicast
ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssdp_socket.setsockopt(
socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.host_ip_addr)
)
ssdp_socket.setsockopt(
socket.SOL_IP,
socket.IP_ADD_MEMBERSHIP,
socket.inet_aton("239.255.255.250") + socket.inet_aton(self.host_ip_addr),
)
if self.upnp_bind_multicast:
ssdp_socket.bind(("", 1900))
else:
ssdp_socket.bind((self.host_ip_addr, 1900))
while True:
if self._interrupted:
return
try:
read, _, _ = select.select([ssdp_socket], [], [ssdp_socket], 2)
if ssdp_socket in read:
data, addr = ssdp_socket.recvfrom(1024)
else:
# most likely the timeout, so check for interrupt
continue
except OSError as ex:
if self._interrupted:
return
_LOGGER.error(
"UPNP Responder socket exception occurred: %s", ex.__str__
)
# without the following continue, a second exception occurs
# because the data object has not been initialized
continue
if "M-SEARCH" in data.decode("utf-8", errors="ignore"):
_LOGGER.debug("UPNP Responder M-SEARCH method received: %s", data)
# SSDP M-SEARCH method received, respond to it with our info
response = self._handle_request(data)
resp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
resp_socket.sendto(response, addr)
_LOGGER.debug("UPNP Responder responding with: %s", response)
resp_socket.close()
def stop(self):
"""Stop the server."""
# Request for server
self._interrupted = True
if self._ssdp_socket:
clean_socket_close(self._ssdp_socket)
self.join()
def _handle_request(self, data):
if "upnp:rootdevice" in data.decode("utf-8", errors="ignore"):
return self._prepare_response(
"upnp:rootdevice", f"uuid:{HUE_UUID}::upnp:rootdevice"
)
return self._prepare_response(
"urn:schemas-upnp-org:device:basic:1", f"uuid:{HUE_UUID}" "urn:schemas-upnp-org:device:basic:1", f"uuid:{HUE_UUID}"
) )
def connection_made(self, transport):
"""Set the transport."""
self.transport = transport
def connection_lost(self, exc):
"""Handle connection lost."""
def datagram_received(self, data, addr):
"""Respond to msearch packets."""
decoded_data = data.decode("utf-8", errors="ignore")
if "M-SEARCH" not in decoded_data:
return
_LOGGER.debug("UPNP Responder M-SEARCH method received: %s", data)
# SSDP M-SEARCH method received, respond to it with our info
response = self._handle_request(decoded_data)
_LOGGER.debug("UPNP Responder responding with: %s", response)
self.transport.sendto(response, addr)
def error_received(self, exc): # pylint: disable=no-self-use
"""Log UPNP errors."""
_LOGGER.error("UPNP Error received: %s", exc)
def close(self):
"""Stop the server."""
_LOGGER.info("UPNP responder shutting down")
if self.transport:
self.transport.close()
self._loop.remove_writer(self._sock.fileno())
self._loop.remove_reader(self._sock.fileno())
self._sock.close()
def _handle_request(self, decoded_data):
if "upnp:rootdevice" in decoded_data:
return self._upnp_root_response
return self._upnp_device_response
def _prepare_response(self, search_target, unique_service_name): def _prepare_response(self, search_target, unique_service_name):
# Note that the double newline at the end of # Note that the double newline at the end of
# this string is required per the SSDP spec # this string is required per the SSDP spec
@ -167,10 +158,3 @@ USN: {unique_service_name}
""" """
return response.replace("\n", "\r\n").encode("utf-8") return response.replace("\n", "\r\n").encode("utf-8")
def clean_socket_close(sock):
"""Close a socket connection and logs its closure."""
_LOGGER.info("UPNP responder shutting down")
sock.close()

View File

@ -96,7 +96,7 @@ def hass_hue(loop, hass):
) )
) )
with patch("homeassistant.components.emulated_hue.UPNPResponderThread"): with patch("homeassistant.components.emulated_hue.create_upnp_datagram_endpoint"):
loop.run_until_complete( loop.run_until_complete(
setup.async_setup_component( setup.async_setup_component(
hass, hass,

View File

@ -8,9 +8,9 @@ import requests
from homeassistant import const, setup from homeassistant import const, setup
from homeassistant.components import emulated_hue from homeassistant.components import emulated_hue
from homeassistant.components.emulated_hue import upnp
from homeassistant.const import HTTP_OK from homeassistant.const import HTTP_OK
from tests.async_mock import patch
from tests.common import get_test_home_assistant, get_test_instance_port from tests.common import get_test_home_assistant, get_test_instance_port
HTTP_SERVER_PORT = get_test_instance_port() HTTP_SERVER_PORT = get_test_instance_port()
@ -20,6 +20,18 @@ BRIDGE_URL_BASE = f"http://127.0.0.1:{BRIDGE_SERVER_PORT}" + "{}"
JSON_HEADERS = {CONTENT_TYPE: const.CONTENT_TYPE_JSON} JSON_HEADERS = {CONTENT_TYPE: const.CONTENT_TYPE_JSON}
class MockTransport:
"""Mock asyncio transport."""
def __init__(self):
"""Create a place to store the sends."""
self.sends = []
def sendto(self, response, addr):
"""Mock sendto."""
self.sends.append((response, addr))
class TestEmulatedHue(unittest.TestCase): class TestEmulatedHue(unittest.TestCase):
"""Test the emulated Hue component.""" """Test the emulated Hue component."""
@ -30,16 +42,11 @@ class TestEmulatedHue(unittest.TestCase):
"""Set up the class.""" """Set up the class."""
cls.hass = hass = get_test_home_assistant() cls.hass = hass = get_test_home_assistant()
with patch("homeassistant.components.emulated_hue.UPNPResponderThread"): setup.setup_component(
setup.setup_component( hass,
hass, emulated_hue.DOMAIN,
emulated_hue.DOMAIN, {emulated_hue.DOMAIN: {emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT}},
{ )
emulated_hue.DOMAIN: {
emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT
}
},
)
cls.hass.start() cls.hass.start()
@ -50,23 +57,24 @@ class TestEmulatedHue(unittest.TestCase):
def test_upnp_discovery_basic(self): def test_upnp_discovery_basic(self):
"""Tests the UPnP basic discovery response.""" """Tests the UPnP basic discovery response."""
with patch("threading.Thread.__init__"): upnp_responder_protocol = upnp.UPNPResponderProtocol(
upnp_responder_thread = emulated_hue.UPNPResponderThread( None, None, "192.0.2.42", 8080
"0.0.0.0", 80, True, "192.0.2.42", 8080 )
) mock_transport = MockTransport()
upnp_responder_protocol.transport = mock_transport
"""Original request emitted by the Hue Bridge v1 app.""" """Original request emitted by the Hue Bridge v1 app."""
request = """M-SEARCH * HTTP/1.1 request = """M-SEARCH * HTTP/1.1
HOST:239.255.255.250:1900 HOST:239.255.255.250:1900
ST:ssdp:all ST:ssdp:all
Man:"ssdp:discover" Man:"ssdp:discover"
MX:3 MX:3
""" """
encoded_request = request.replace("\n", "\r\n").encode("utf-8") encoded_request = request.replace("\n", "\r\n").encode("utf-8")
response = upnp_responder_thread._handle_request(encoded_request) upnp_responder_protocol.datagram_received(encoded_request, 1234)
expected_response = """HTTP/1.1 200 OK expected_response = """HTTP/1.1 200 OK
CACHE-CONTROL: max-age=60 CACHE-CONTROL: max-age=60
EXT: EXT:
LOCATION: http://192.0.2.42:8080/description.xml LOCATION: http://192.0.2.42:8080/description.xml
@ -76,27 +84,30 @@ ST: urn:schemas-upnp-org:device:basic:1
USN: uuid:2f402f80-da50-11e1-9b23-001788255acc USN: uuid:2f402f80-da50-11e1-9b23-001788255acc
""" """
assert expected_response.replace("\n", "\r\n").encode("utf-8") == response expected_send = expected_response.replace("\n", "\r\n").encode("utf-8")
assert mock_transport.sends == [(expected_send, 1234)]
def test_upnp_discovery_rootdevice(self): def test_upnp_discovery_rootdevice(self):
"""Tests the UPnP rootdevice discovery response.""" """Tests the UPnP rootdevice discovery response."""
with patch("threading.Thread.__init__"): upnp_responder_protocol = upnp.UPNPResponderProtocol(
upnp_responder_thread = emulated_hue.UPNPResponderThread( None, None, "192.0.2.42", 8080
"0.0.0.0", 80, True, "192.0.2.42", 8080 )
) mock_transport = MockTransport()
upnp_responder_protocol.transport = mock_transport
"""Original request emitted by Busch-Jaeger free@home SysAP.""" """Original request emitted by Busch-Jaeger free@home SysAP."""
request = """M-SEARCH * HTTP/1.1 request = """M-SEARCH * HTTP/1.1
HOST: 239.255.255.250:1900 HOST: 239.255.255.250:1900
MAN: "ssdp:discover" MAN: "ssdp:discover"
MX: 40 MX: 40
ST: upnp:rootdevice ST: upnp:rootdevice
""" """
encoded_request = request.replace("\n", "\r\n").encode("utf-8") encoded_request = request.replace("\n", "\r\n").encode("utf-8")
response = upnp_responder_thread._handle_request(encoded_request) upnp_responder_protocol.datagram_received(encoded_request, 1234)
expected_response = """HTTP/1.1 200 OK expected_response = """HTTP/1.1 200 OK
CACHE-CONTROL: max-age=60 CACHE-CONTROL: max-age=60
EXT: EXT:
LOCATION: http://192.0.2.42:8080/description.xml LOCATION: http://192.0.2.42:8080/description.xml
@ -106,7 +117,31 @@ ST: upnp:rootdevice
USN: uuid:2f402f80-da50-11e1-9b23-001788255acc::upnp:rootdevice USN: uuid:2f402f80-da50-11e1-9b23-001788255acc::upnp:rootdevice
""" """
assert expected_response.replace("\n", "\r\n").encode("utf-8") == response expected_send = expected_response.replace("\n", "\r\n").encode("utf-8")
assert mock_transport.sends == [(expected_send, 1234)]
def test_upnp_no_response(self):
"""Tests the UPnP does not response on an invalid request."""
upnp_responder_protocol = upnp.UPNPResponderProtocol(
None, None, "192.0.2.42", 8080
)
mock_transport = MockTransport()
upnp_responder_protocol.transport = mock_transport
"""Original request emitted by the Hue Bridge v1 app."""
request = """INVALID * HTTP/1.1
HOST:239.255.255.250:1900
ST:ssdp:all
Man:"ssdp:discover"
MX:3
"""
encoded_request = request.replace("\n", "\r\n").encode("utf-8")
upnp_responder_protocol.datagram_received(encoded_request, 1234)
assert mock_transport.sends == []
def test_description_xml(self): def test_description_xml(self):
"""Test the description.""" """Test the description."""