Bugfix mqtt socket memory error (#6501)

* Bugfix mqtt socket memory error

* Fix tests

* Fix lint
This commit is contained in:
Pascal Vizeli 2017-03-09 15:31:43 +01:00 committed by Paulus Schoutsen
parent 855756cb2a
commit 20fcd1f0e2
3 changed files with 15 additions and 34 deletions

View File

@ -23,8 +23,8 @@ from homeassistant.helpers.dispatcher import (
from homeassistant.util.async import ( from homeassistant.util.async import (
run_coroutine_threadsafe, run_callback_threadsafe) run_coroutine_threadsafe, run_callback_threadsafe)
from homeassistant.const import ( from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, CONF_VALUE_TEMPLATE, EVENT_HOMEASSISTANT_STOP, CONF_VALUE_TEMPLATE, CONF_USERNAME,
CONF_USERNAME, CONF_PASSWORD, CONF_PORT, CONF_PROTOCOL, CONF_PAYLOAD) CONF_PASSWORD, CONF_PORT, CONF_PROTOCOL, CONF_PAYLOAD)
from homeassistant.components.mqtt.server import HBMQTT_CONFIG_SCHEMA from homeassistant.components.mqtt.server import HBMQTT_CONFIG_SCHEMA
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -331,18 +331,11 @@ def async_setup(hass, config):
@asyncio.coroutine @asyncio.coroutine
def async_stop_mqtt(event): def async_stop_mqtt(event):
"""Stop MQTT component.""" """Stop MQTT component."""
yield from hass.data[DATA_MQTT].async_stop() yield from hass.data[DATA_MQTT].async_disconnect()
@asyncio.coroutine
def async_start_mqtt(event):
"""Launch MQTT component when Home Assistant starts up."""
yield from hass.data[DATA_MQTT].async_start()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_stop_mqtt) hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_stop_mqtt)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, async_start_mqtt)
success = yield from hass.data[DATA_MQTT].async_connect() success = yield from hass.data[DATA_MQTT].async_connect()
if not success: if not success:
return False return False
@ -442,13 +435,12 @@ class MQTT(object):
with (yield from self._paho_lock): with (yield from self._paho_lock):
yield from self.hass.loop.run_in_executor( yield from self.hass.loop.run_in_executor(
None, self._mqttc.publish, topic, payload, qos, retain) None, self._mqttc.publish, topic, payload, qos, retain)
yield from asyncio.sleep(0, loop=self.hass.loop)
@asyncio.coroutine @asyncio.coroutine
def async_connect(self): def async_connect(self):
"""Connect to the host. Does not process messages yet. """Connect to the host. Does process messages yet.
This method must be run in the event loop and returns a coroutine. This method is a coroutine.
""" """
result = yield from self.hass.loop.run_in_executor( result = yield from self.hass.loop.run_in_executor(
None, self._mqttc.connect, self.broker, self.port, self.keepalive) None, self._mqttc.connect, self.broker, self.port, self.keepalive)
@ -456,17 +448,12 @@ class MQTT(object):
if result != 0: if result != 0:
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
_LOGGER.error('Failed to connect: %s', mqtt.error_string(result)) _LOGGER.error('Failed to connect: %s', mqtt.error_string(result))
else:
self._mqttc.loop_start()
return not result return not result
def async_start(self): def async_disconnect(self):
"""Run the MQTT client.
This method must be run in the event loop and returns a coroutine.
"""
return self.hass.loop.run_in_executor(None, self._mqttc.loop_start)
def async_stop(self):
"""Stop the MQTT client. """Stop the MQTT client.
This method must be run in the event loop and returns a coroutine. This method must be run in the event loop and returns a coroutine.
@ -493,7 +480,6 @@ class MQTT(object):
result, mid = yield from self.hass.loop.run_in_executor( result, mid = yield from self.hass.loop.run_in_executor(
None, self._mqttc.subscribe, topic, qos) None, self._mqttc.subscribe, topic, qos)
yield from asyncio.sleep(0, loop=self.hass.loop)
_raise_on_error(result) _raise_on_error(result)
self.progress[mid] = topic self.progress[mid] = topic

View File

@ -51,7 +51,7 @@ def async_dispatcher_connect(hass, signal, target):
def dispatcher_send(hass, signal, *args): def dispatcher_send(hass, signal, *args):
"""Send signal and data.""" """Send signal and data."""
hass.add_job(async_dispatcher_send, hass, signal, *args) hass.loop.call_soon_threadsafe(async_dispatcher_send, hass, signal, *args)
@callback @callback

View File

@ -11,8 +11,7 @@ from homeassistant.core import callback
from homeassistant.setup import setup_component, async_setup_component from homeassistant.setup import setup_component, async_setup_component
import homeassistant.components.mqtt as mqtt import homeassistant.components.mqtt as mqtt
from homeassistant.const import ( from homeassistant.const import (
EVENT_CALL_SERVICE, ATTR_DOMAIN, ATTR_SERVICE, EVENT_HOMEASSISTANT_START, EVENT_CALL_SERVICE, ATTR_DOMAIN, ATTR_SERVICE, EVENT_HOMEASSISTANT_STOP)
EVENT_HOMEASSISTANT_STOP)
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import async_dispatcher_connect
from tests.common import ( from tests.common import (
@ -55,19 +54,15 @@ class TestMQTT(unittest.TestCase):
"""Helper for recording calls.""" """Helper for recording calls."""
self.calls.append(args) self.calls.append(args)
def test_client_starts_on_home_assistant_start(self): def test_client_starts_on_home_assistant_mqtt_setup(self):
""""Test if client start on HA launch.""" """Test if client is connect after mqtt init on bootstrap."""
self.hass.bus.fire(EVENT_HOMEASSISTANT_START) assert self.hass.data['mqtt'].async_connect.called
self.hass.block_till_done()
self.assertTrue(self.hass.data['mqtt'].async_start.called)
def test_client_stops_on_home_assistant_start(self): def test_client_stops_on_home_assistant_start(self):
"""Test if client stops on HA launch.""" """Test if client stops on HA launch."""
self.hass.bus.fire(EVENT_HOMEASSISTANT_START)
self.hass.block_till_done()
self.hass.bus.fire(EVENT_HOMEASSISTANT_STOP) self.hass.bus.fire(EVENT_HOMEASSISTANT_STOP)
self.hass.block_till_done() self.hass.block_till_done()
self.assertTrue(self.hass.data['mqtt'].async_stop.called) self.assertTrue(self.hass.data['mqtt'].async_disconnect.called)
def test_publish_calls_service(self): def test_publish_calls_service(self):
"""Test the publishing of call to services.""" """Test the publishing of call to services."""