MQTT.Server will use HASS eventloop (#3429)

This commit is contained in:
Paulus Schoutsen 2016-11-10 22:45:38 -08:00 committed by GitHub
parent 20af5cb5b4
commit 2bfded7153
2 changed files with 19 additions and 48 deletions

View File

@ -4,41 +4,21 @@ Support for a local MQTT broker.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt/#use-the-embedded-broker
"""
import asyncio
import logging
import tempfile
import threading
from homeassistant.core import callback
from homeassistant.components.mqtt import PROTOCOL_311
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.util.async import run_coroutine_threadsafe
REQUIREMENTS = ['hbmqtt==0.7.1']
DEPENDENCIES = ['http']
@asyncio.coroutine
def broker_coro(loop, config):
"""Start broker coroutine."""
from hbmqtt.broker import Broker
broker = Broker(config, loop)
yield from broker.start()
return broker
def loop_run(loop, broker, shutdown_complete):
"""Run broker and clean up when done."""
loop.run_forever()
# run_forever ends when stop is called because we're shutting down
loop.run_until_complete(broker.shutdown())
loop.close()
shutdown_complete.set()
def start(hass, server_config):
"""Initialize MQTT Server."""
from hbmqtt.broker import BrokerException
loop = asyncio.new_event_loop()
from hbmqtt.broker import Broker, BrokerException
try:
passwd = tempfile.NamedTemporaryFile()
@ -48,29 +28,20 @@ def start(hass, server_config):
else:
client_config = None
start_server = asyncio.gather(broker_coro(loop, server_config),
loop=loop)
loop.run_until_complete(start_server)
# Result raises exception if one was raised during startup
broker = start_server.result()[0]
broker = Broker(server_config, hass.loop)
run_coroutine_threadsafe(broker.start(), hass.loop).result()
except BrokerException:
logging.getLogger(__name__).exception('Error initializing MQTT server')
loop.close()
return False, None
finally:
passwd.close()
shutdown_complete = threading.Event()
@callback
def shutdown_mqtt_server(event):
"""Shut down the MQTT server."""
hass.async_add_job(broker.shutdown())
def shutdown(event):
"""Gracefully shutdown MQTT broker."""
loop.call_soon_threadsafe(loop.stop)
shutdown_complete.wait()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
threading.Thread(target=loop_run, args=(loop, broker, shutdown_complete),
name="MQTT-server").start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown_mqtt_server)
return True, client_config

View File

@ -13,6 +13,7 @@ class TestMQTT:
def setup_method(self, method):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.hass.config.components.append('http')
def teardown_method(self, method):
"""Stop everything that was started."""
@ -20,12 +21,13 @@ class TestMQTT:
@patch('passlib.apps.custom_app_context', Mock(return_value=''))
@patch('tempfile.NamedTemporaryFile', Mock(return_value=MagicMock()))
@patch('asyncio.new_event_loop', Mock())
@patch('homeassistant.components.mqtt.server.run_coroutine_threadsafe',
Mock(return_value=MagicMock()))
@patch('hbmqtt.broker.Broker', Mock(return_value=MagicMock()))
@patch('homeassistant.components.mqtt.MQTT')
@patch('asyncio.gather')
def test_creating_config_with_http_pass(self, mock_gather, mock_mqtt):
def test_creating_config_with_http_pass(self, mock_mqtt):
"""Test if the MQTT server gets started and subscribe/publish msg."""
self.hass.config.components.append('http')
self.hass.bus.listen_once = MagicMock()
password = 'super_secret'
self.hass.config.api = MagicMock(api_password=password)
@ -44,14 +46,12 @@ class TestMQTT:
assert mock_mqtt.mock_calls[0][1][6] is None
@patch('tempfile.NamedTemporaryFile', Mock(return_value=MagicMock()))
@patch('asyncio.new_event_loop', Mock())
@patch('asyncio.gather')
def test_broker_config_fails(self, mock_gather):
@patch('homeassistant.components.mqtt.server.run_coroutine_threadsafe')
def test_broker_config_fails(self, mock_run):
"""Test if the MQTT component fails if server fails."""
self.hass.config.components.append('http')
from hbmqtt.broker import BrokerException
mock_gather.side_effect = BrokerException
mock_run.side_effect = BrokerException
self.hass.config.api = MagicMock(api_password=None)