Reconnect when disconnected from MQTT

This commit is contained in:
Paulus Schoutsen 2015-11-22 15:09:56 -08:00
parent 7f1254d750
commit f170799182

View File

@ -6,10 +6,12 @@ MQTT component, using paho-mqtt.
For more details about this component, please refer to the documentation at For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt/ https://home-assistant.io/components/mqtt/
""" """
import json
import logging import logging
import os import os
import socket import socket
import json import time
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
import homeassistant.util as util import homeassistant.util as util
@ -45,6 +47,8 @@ ATTR_TOPIC = 'topic'
ATTR_PAYLOAD = 'payload' ATTR_PAYLOAD = 'payload'
ATTR_QOS = 'qos' ATTR_QOS = 'qos'
MAX_RECONNECT_WAIT = 300 # seconds
def publish(hass, topic, payload, qos=None): def publish(hass, topic, payload, qos=None):
""" Send an MQTT message. """ """ Send an MQTT message. """
@ -66,9 +70,7 @@ def subscribe(hass, topic, callback, qos=DEFAULT_QOS):
event.data[ATTR_QOS]) event.data[ATTR_QOS])
hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, mqtt_topic_subscriber) hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, mqtt_topic_subscriber)
MQTT_CLIENT.subscribe(topic, qos)
if topic not in MQTT_CLIENT.topics:
MQTT_CLIENT.subscribe(topic, qos)
def setup(hass, config): def setup(hass, config):
@ -162,24 +164,30 @@ class MQTT(object): # pragma: no cover
password, certificate): password, certificate):
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
self.hass = hass self.userdata = {
self._progress = {} 'hass': hass,
self.topics = {} 'topics': {},
'progress': {},
}
if client_id is None: if client_id is None:
self._mqttc = mqtt.Client() self._mqttc = mqtt.Client()
else: else:
self._mqttc = mqtt.Client(client_id) self._mqttc = mqtt.Client(client_id)
self._mqttc.user_data_set(self.userdata)
if username is not None: if username is not None:
self._mqttc.username_pw_set(username, password) self._mqttc.username_pw_set(username, password)
if certificate is not None: if certificate is not None:
self._mqttc.tls_set(certificate) self._mqttc.tls_set(certificate)
self._mqttc.on_subscribe = self._mqtt_on_subscribe self._mqttc.on_subscribe = _mqtt_on_subscribe
self._mqttc.on_unsubscribe = self._mqtt_on_unsubscribe self._mqttc.on_unsubscribe = _mqtt_on_unsubscribe
self._mqttc.on_connect = self._mqtt_on_connect self._mqttc.on_connect = _mqtt_on_connect
self._mqttc.on_message = self._mqtt_on_message self._mqttc.on_disconnect = _mqtt_on_disconnect
self._mqttc.on_message = _mqtt_on_message
self._mqttc.connect(broker, port, keepalive) self._mqttc.connect(broker, port, keepalive)
def publish(self, topic, payload, qos): def publish(self, topic, payload, qos):
@ -190,7 +198,7 @@ class MQTT(object): # pragma: no cover
""" Unsubscribe from topic. """ """ Unsubscribe from topic. """
result, mid = self._mqttc.unsubscribe(topic) result, mid = self._mqttc.unsubscribe(topic)
_raise_on_error(result) _raise_on_error(result)
self._progress[mid] = topic self.userdata['progress'][mid] = topic
def start(self): def start(self):
""" Run the MQTT client. """ """ Run the MQTT client. """
@ -202,55 +210,87 @@ class MQTT(object): # pragma: no cover
def subscribe(self, topic, qos): def subscribe(self, topic, qos):
""" Subscribe to a topic. """ """ Subscribe to a topic. """
if topic in self.topics: if topic in self.userdata['topics']:
return return
result, mid = self._mqttc.subscribe(topic, qos) result, mid = self._mqttc.subscribe(topic, qos)
_raise_on_error(result) _raise_on_error(result)
self._progress[mid] = topic self.userdata['progress'][mid] = topic
self.topics[topic] = None self.userdata['topics'][topic] = None
def _mqtt_on_connect(self, mqttc, obj, flags, result_code):
""" On connect, resubscribe to all topics we were subscribed to. """
if result_code != 0:
_LOGGER.error('Unable to connect to the MQTT broker: %s', {
1: 'Incorrect protocol version',
2: 'Invalid client identifier',
3: 'Server unavailable',
4: 'Bad username or password',
5: 'Not authorised'
}.get(result_code))
self._mqttc.disconnect()
return
old_topics = self.topics def _mqtt_on_message(mqttc, userdata, msg):
self._progress = {} """ Message callback """
self.topics = {} userdata['hass'].bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
for topic, qos in old_topics.items(): ATTR_TOPIC: msg.topic,
# qos is None if we were in process of subscribing ATTR_QOS: msg.qos,
if qos is not None: ATTR_PAYLOAD: msg.payload.decode('utf-8'),
self._mqttc.subscribe(topic, qos) })
def _mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos):
""" Called when subscribe succesfull. """
topic = self._progress.pop(mid, None)
if topic is None:
return
self.topics[topic] = granted_qos
def _mqtt_on_unsubscribe(self, mqttc, obj, mid, granted_qos): def _mqtt_on_connect(mqttc, userdata, flags, result_code):
""" Called when subscribe succesfull. """ """ On connect, resubscribe to all topics we were subscribed to. """
topic = self._progress.pop(mid, None) if result_code != 0:
if topic is None: _LOGGER.error('Unable to connect to the MQTT broker: %s', {
return 1: 'Incorrect protocol version',
self.topics.pop(topic, None) 2: 'Invalid client identifier',
3: 'Server unavailable',
4: 'Bad username or password',
5: 'Not authorised'
}.get(result_code))
mqttc.disconnect()
return
def _mqtt_on_message(self, mqttc, obj, msg): old_topics = userdata['topics']
""" Message callback """
self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, { userdata['topics'] = {}
ATTR_TOPIC: msg.topic, userdata['progress'] = {}
ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'), for topic, qos in old_topics.items():
}) # qos is None if we were in process of subscribing
if qos is not None:
mqttc.subscribe(topic, qos)
def _mqtt_on_subscribe(mqttc, userdata, mid, granted_qos):
""" Called when subscribe successfull. """
topic = userdata['progress'].pop(mid, None)
if topic is None:
return
userdata['topics'][topic] = granted_qos
def _mqtt_on_unsubscribe(mqttc, userdata, mid, granted_qos):
""" Called when subscribe successfull. """
topic = userdata['progress'].pop(mid, None)
if topic is None:
return
userdata['topics'].pop(topic, None)
def _mqtt_on_disconnect(mqttc, userdata, result_code):
""" Called when being disconnected. """
# When disconnected because of calling disconnect()
if result_code == 0:
return
tries = 0
wait_time = 0
while True:
try:
if mqttc.reconnect() == 0:
_LOGGER.info('Successfully reconnected to the MQTT server')
break
except socket.error:
pass
wait_time = min(2**tries, MAX_RECONNECT_WAIT)
_LOGGER.warning(
'Disconnected from MQTT (%s). Trying to reconnect in %ss',
result_code, wait_time)
# It is ok to sleep here as we are in the MQTT thread.
time.sleep(wait_time)
tries += 1
def _raise_on_error(result): # pragma: no cover def _raise_on_error(result): # pragma: no cover