Clean up MQTT component

This commit is contained in:
Paulus Schoutsen 2015-08-08 23:49:38 -07:00
parent b37471af68
commit a2c6dbf479

View File

@ -29,8 +29,8 @@ port
The network port to connect to. Default is 1883. The network port to connect to. Default is 1883.
topic topic
*Required *Optional
The MQTT topic to subscribe to, e.g. home-assistant. The MQTT topic to subscribe to. Default is home-assistant.
keepalive keepalive
*Optional *Optional
@ -42,62 +42,72 @@ Quality of service level to use for the subscription.
0, 1, or 2, defaults to 0. 0, 1, or 2, defaults to 0.
""" """
import logging import logging
import socket
import homeassistant.util as util
from homeassistant.helpers import validate_config from homeassistant.helpers import validate_config
from homeassistant.const import ( from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP) EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
try:
import paho.mqtt.client as mqtt
except ImportError:
mqtt = None
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DOMAIN = "mqtt" DOMAIN = "mqtt"
DEPENDENCIES = []
MQTT_CLIENT = None MQTT_CLIENT = None
MQTT_CLIENT_ID = 'home-assistant'
MQTT_DEFAULT_PORT = 1883 DEFAULT_TOPIC = 'home-assistant'
MQTT_DEFAULT_KEEPALIVE = 60 DEFAULT_PORT = 1883
MQTT_DEFAULT_QOS = 0 DEFAULT_KEEPALIVE = 60
MQTT_SEND = 'mqtt_send' DEFAULT_QOS = 0
SERVICE_PUBLISH = 'publish'
EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED' EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED'
DEPENDENCIES = []
REQUIREMENTS = ['paho-mqtt>=1.1'] REQUIREMENTS = ['paho-mqtt>=1.1']
CONF_BROKER = 'broker'
CONF_PORT = 'port'
CONF_TOPIC = 'topic'
CONF_KEEPALIVE = 'keepalive'
CONF_QOS = 'qos'
ATTR_QOS = 'qos'
ATTR_TOPIC = 'topic'
ATTR_SUBTOPIC = 'subtopic' ATTR_SUBTOPIC = 'subtopic'
ATTR_PAYLOAD = 'payload' ATTR_PAYLOAD = 'payload'
def send_message(hass, subtopic, payload): def publish(hass, payload, subtopic=None):
""" Send an MQTT message. """ """ Send an MQTT message. """
hass.services.call(DOMAIN, MQTT_SEND, {ATTR_SUBTOPIC: subtopic, data = {ATTR_PAYLOAD: payload}
ATTR_PAYLOAD: payload}) if subtopic is not None:
data[ATTR_SUBTOPIC] = subtopic
hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
def setup(hass, config): def setup(hass, config):
""" Get the MQTT protocol service. """ """ Get the MQTT protocol service. """
if not validate_config(config, if not validate_config(config, {DOMAIN: ['broker']}, _LOGGER):
{DOMAIN: ['broker',
'topic']},
_LOGGER):
return False return False
if mqtt is None: conf = config[DOMAIN]
_LOGGER.error("Error while importing dependency 'paho-mqtt'.")
return False broker = conf[CONF_BROKER]
port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT)
topic = util.convert(conf.get(CONF_TOPIC), str, DEFAULT_TOPIC)
keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE)
qos = util.convert(conf.get(CONF_QOS), int, DEFAULT_QOS)
global MQTT_CLIENT global MQTT_CLIENT
try:
broker = config[DOMAIN]['broker'] MQTT_CLIENT = MQTT(hass, broker, port, keepalive, qos)
port = config[DOMAIN].get('port', MQTT_DEFAULT_PORT) except socket.error:
topic = config[DOMAIN]['topic'] _LOGGER.exception("Can't connect to the broker. "
keepalive = config[DOMAIN].get('keepalive', MQTT_DEFAULT_KEEPALIVE) "Please check your settings and the broker "
qos = config[DOMAIN].get('qos', MQTT_DEFAULT_QOS) "itself.")
return False
MQTT_CLIENT = MQTT(hass, broker, port, topic, keepalive, qos,
MQTT_CLIENT_ID)
def stop_mqtt(event): def stop_mqtt(event):
""" Stop MQTT component. """ """ Stop MQTT component. """
@ -105,29 +115,23 @@ def setup(hass, config):
def start_mqtt(event): def start_mqtt(event):
""" Launch MQTT component when Home Assistant starts up. """ """ Launch MQTT component when Home Assistant starts up. """
try: MQTT_CLIENT.subscribe('{}/#'.format(topic))
MQTT_CLIENT.run() MQTT_CLIENT.start()
except ConnectionRefusedError:
_LOGGER.exception("Can't connect to the broker. "
"Please check your settings and the broker"
"itself.")
return False
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt) hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt)
def mqtt_message(call): def publish_service(call):
""" Handle sending MQTT message service calls. """ """ Handle MQTT publish service calls. """
subtopic = call.data.get(ATTR_SUBTOPIC)
complete_topic = '{}/{}'.format(str(topic), str(subtopic))
payload = call.data.get(ATTR_PAYLOAD) payload = call.data.get(ATTR_PAYLOAD)
if payload is None:
return
subtopic = call.data.get(ATTR_SUBTOPIC)
msg_topic = '{}/{}'.format(topic, subtopic) if subtopic else topic
MQTT_CLIENT.publish(complete_topic, payload=payload) MQTT_CLIENT.publish(msg_topic, payload=payload)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt) hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt)
hass.services.register(DOMAIN, MQTT_SEND, mqtt_message) hass.services.register(DOMAIN, SERVICE_PUBLISH, publish_service)
return True return True
@ -137,60 +141,44 @@ def setup(hass, config):
# pylint: disable=too-many-arguments, invalid-name # pylint: disable=too-many-arguments, invalid-name
class MQTT(object): class MQTT(object):
""" Implements messaging service for MQTT. """ """ Implements messaging service for MQTT. """
def __init__(self, hass, broker, port, topic, keepalive, qos, clientid): def __init__(self, hass, broker, port, keepalive, qos):
import paho.mqtt.client as mqtt
self.hass = hass self.hass = hass
self._broker = broker
self._port = port
self._topic = topic
self._keepalive = keepalive
self._qos = qos self._qos = qos
self._mqttc = mqtt.Client(clientid) self._mqttc = mqtt.Client()
self._mqttc.on_message = self.mqtt_on_message self._mqttc.on_message = self.mqtt_on_message
self._mqttc.on_connect = self.mqtt_on_connect self._mqttc.connect(broker, port, keepalive)
self._mqttc.on_publish = self.mqtt_on_publish
self._mqttc.on_subscribe = self.mqtt_on_subscribe
def mqtt_on_connect(self, mqttc, obj, flags, rc):
""" Connect callback """
_LOGGER.info('Connected to broker %s', self._broker)
def mqtt_on_publish(self, mqttc, obj, mid):
""" Publish callback """
pass
def mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos):
""" Subscribe callback """
complete_topic = '{}/#'.format(self._topic)
_LOGGER.info('Subscribed to %s', complete_topic)
def mqtt_on_message(self, mqttc, obj, msg): def mqtt_on_message(self, mqttc, obj, msg):
""" Message callback """ """ Message callback """
if '/' in msg.topic:
msg_topic, msg_subtopic = msg.topic.split('/', 1)
else:
msg_topic, msg_subtopic = msg.topic, ''
self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, { self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
'topic': msg.topic, ATTR_TOPIC: msg_topic,
'qos': str(msg.qos), ATTR_SUBTOPIC: msg_subtopic,
'payload': msg.payload.decode('utf-8'), ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'),
}) })
def subscribe(self, topic): def subscribe(self, topic):
""" Subscribe to a topic. """ """ Subscribe to a topic. """
self._mqttc.subscribe(self._topic, qos=self._qos) self._mqttc.subscribe(topic, qos=self._qos)
def unsubscribe(self, topic): def unsubscribe(self, topic):
""" Unsubscribe from topic. """ """ Unsubscribe from topic. """
self._mqttc.unsubscribe(topic) self._mqttc.unsubscribe(topic)
def stop(self): def stop(self):
""" Stop the MWTT client. """ """ Stop the MQTT client. """
self._mqttc.loop_stop() self._mqttc.loop_stop()
def run(self): def start(self):
""" Run the MQTT client. """ """ Run the MQTT client. """
self._mqttc.connect(self._broker,
int(self._port),
int(self._keepalive))
self._mqttc.subscribe('{}/#'.format(self._topic), qos=self._qos)
self._mqttc.loop_start() self._mqttc.loop_start()
def publish(self, topic, payload): def publish(self, topic, payload):