diff --git a/homeassistant/components/mqtt/__init__.py b/homeassistant/components/mqtt/__init__.py index 01956a85c36..86896e8309e 100644 --- a/homeassistant/components/mqtt/__init__.py +++ b/homeassistant/components/mqtt/__init__.py @@ -21,6 +21,7 @@ import homeassistant.helpers.config_validation as cv from homeassistant.const import ( EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, CONF_PLATFORM, CONF_SCAN_INTERVAL, CONF_VALUE_TEMPLATE) +from homeassistant.util.async import run_callback_threadsafe _LOGGER = logging.getLogger(__name__) @@ -165,6 +166,18 @@ def publish_template(hass, topic, payload_template, qos=None, retain=None): def subscribe(hass, topic, callback, qos=DEFAULT_QOS): + """Subscribe to an MQTT topic.""" + async_remove = run_callback_threadsafe( + hass.loop, async_subscribe, hass, topic, callback, qos).result() + + def remove_mqtt(): + """Remove MQTT subscription.""" + run_callback_threadsafe(hass.loop, async_remove).result() + + return remove_mqtt + + +def async_subscribe(hass, topic, callback, qos=DEFAULT_QOS): """Subscribe to an MQTT topic.""" @asyncio.coroutine def mqtt_topic_subscriber(event): @@ -181,13 +194,13 @@ def subscribe(hass, topic, callback, qos=DEFAULT_QOS): event.data[ATTR_PAYLOAD], event.data[ATTR_QOS], priority=JobPriority.EVENT_CALLBACK) - remove = hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, - mqtt_topic_subscriber) + async_remove = hass.bus.async_listen(EVENT_MQTT_MESSAGE_RECEIVED, + mqtt_topic_subscriber) # Future: track subscriber count and unsubscribe in remove MQTT_CLIENT.subscribe(topic, qos) - return remove + return async_remove def _setup_server(hass, config):