From 2bfded7153cababe7702e2b187b7f229ec4e6b06 Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Thu, 10 Nov 2016 22:45:38 -0800 Subject: [PATCH] MQTT.Server will use HASS eventloop (#3429) --- homeassistant/components/mqtt/server.py | 49 +++++-------------------- tests/components/mqtt/test_server.py | 18 ++++----- 2 files changed, 19 insertions(+), 48 deletions(-) diff --git a/homeassistant/components/mqtt/server.py b/homeassistant/components/mqtt/server.py index cffde56b319..cc240e41a30 100644 --- a/homeassistant/components/mqtt/server.py +++ b/homeassistant/components/mqtt/server.py @@ -4,41 +4,21 @@ Support for a local MQTT broker. For more details about this component, please refer to the documentation at https://home-assistant.io/components/mqtt/#use-the-embedded-broker """ -import asyncio import logging import tempfile -import threading +from homeassistant.core import callback from homeassistant.components.mqtt import PROTOCOL_311 from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.util.async import run_coroutine_threadsafe REQUIREMENTS = ['hbmqtt==0.7.1'] DEPENDENCIES = ['http'] -@asyncio.coroutine -def broker_coro(loop, config): - """Start broker coroutine.""" - from hbmqtt.broker import Broker - broker = Broker(config, loop) - yield from broker.start() - return broker - - -def loop_run(loop, broker, shutdown_complete): - """Run broker and clean up when done.""" - loop.run_forever() - # run_forever ends when stop is called because we're shutting down - loop.run_until_complete(broker.shutdown()) - loop.close() - shutdown_complete.set() - - def start(hass, server_config): """Initialize MQTT Server.""" - from hbmqtt.broker import BrokerException - - loop = asyncio.new_event_loop() + from hbmqtt.broker import Broker, BrokerException try: passwd = tempfile.NamedTemporaryFile() @@ -48,29 +28,20 @@ def start(hass, server_config): else: client_config = None - start_server = asyncio.gather(broker_coro(loop, server_config), - loop=loop) - loop.run_until_complete(start_server) - # Result raises exception if one was raised during startup - broker = start_server.result()[0] + broker = Broker(server_config, hass.loop) + run_coroutine_threadsafe(broker.start(), hass.loop).result() except BrokerException: logging.getLogger(__name__).exception('Error initializing MQTT server') - loop.close() return False, None finally: passwd.close() - shutdown_complete = threading.Event() + @callback + def shutdown_mqtt_server(event): + """Shut down the MQTT server.""" + hass.async_add_job(broker.shutdown()) - def shutdown(event): - """Gracefully shutdown MQTT broker.""" - loop.call_soon_threadsafe(loop.stop) - shutdown_complete.wait() - - hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) - - threading.Thread(target=loop_run, args=(loop, broker, shutdown_complete), - name="MQTT-server").start() + hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown_mqtt_server) return True, client_config diff --git a/tests/components/mqtt/test_server.py b/tests/components/mqtt/test_server.py index 7b0963da23c..03caf8cc4c0 100644 --- a/tests/components/mqtt/test_server.py +++ b/tests/components/mqtt/test_server.py @@ -13,6 +13,7 @@ class TestMQTT: def setup_method(self, method): """Setup things to be run when tests are started.""" self.hass = get_test_home_assistant() + self.hass.config.components.append('http') def teardown_method(self, method): """Stop everything that was started.""" @@ -20,12 +21,13 @@ class TestMQTT: @patch('passlib.apps.custom_app_context', Mock(return_value='')) @patch('tempfile.NamedTemporaryFile', Mock(return_value=MagicMock())) - @patch('asyncio.new_event_loop', Mock()) + @patch('homeassistant.components.mqtt.server.run_coroutine_threadsafe', + Mock(return_value=MagicMock())) + @patch('hbmqtt.broker.Broker', Mock(return_value=MagicMock())) @patch('homeassistant.components.mqtt.MQTT') - @patch('asyncio.gather') - def test_creating_config_with_http_pass(self, mock_gather, mock_mqtt): + def test_creating_config_with_http_pass(self, mock_mqtt): """Test if the MQTT server gets started and subscribe/publish msg.""" - self.hass.config.components.append('http') + self.hass.bus.listen_once = MagicMock() password = 'super_secret' self.hass.config.api = MagicMock(api_password=password) @@ -44,14 +46,12 @@ class TestMQTT: assert mock_mqtt.mock_calls[0][1][6] is None @patch('tempfile.NamedTemporaryFile', Mock(return_value=MagicMock())) - @patch('asyncio.new_event_loop', Mock()) - @patch('asyncio.gather') - def test_broker_config_fails(self, mock_gather): + @patch('homeassistant.components.mqtt.server.run_coroutine_threadsafe') + def test_broker_config_fails(self, mock_run): """Test if the MQTT component fails if server fails.""" - self.hass.config.components.append('http') from hbmqtt.broker import BrokerException - mock_gather.side_effect = BrokerException + mock_run.side_effect = BrokerException self.hass.config.api = MagicMock(api_password=None)