From a2c6dbf4794b21e50c025462be61c08cf17e7c1c Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Sat, 8 Aug 2015 23:49:38 -0700 Subject: [PATCH] Clean up MQTT component --- homeassistant/components/mqtt.py | 148 ++++++++++++++----------------- 1 file changed, 68 insertions(+), 80 deletions(-) diff --git a/homeassistant/components/mqtt.py b/homeassistant/components/mqtt.py index 592706f701f..195360c9b03 100644 --- a/homeassistant/components/mqtt.py +++ b/homeassistant/components/mqtt.py @@ -29,8 +29,8 @@ port The network port to connect to. Default is 1883. topic -*Required -The MQTT topic to subscribe to, e.g. home-assistant. +*Optional +The MQTT topic to subscribe to. Default is home-assistant. keepalive *Optional @@ -42,62 +42,72 @@ Quality of service level to use for the subscription. 0, 1, or 2, defaults to 0. """ import logging +import socket +import homeassistant.util as util from homeassistant.helpers import validate_config from homeassistant.const import ( EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP) -try: - import paho.mqtt.client as mqtt -except ImportError: - mqtt = None - _LOGGER = logging.getLogger(__name__) DOMAIN = "mqtt" -DEPENDENCIES = [] + MQTT_CLIENT = None -MQTT_CLIENT_ID = 'home-assistant' -MQTT_DEFAULT_PORT = 1883 -MQTT_DEFAULT_KEEPALIVE = 60 -MQTT_DEFAULT_QOS = 0 -MQTT_SEND = 'mqtt_send' + +DEFAULT_TOPIC = 'home-assistant' +DEFAULT_PORT = 1883 +DEFAULT_KEEPALIVE = 60 +DEFAULT_QOS = 0 + +SERVICE_PUBLISH = 'publish' EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED' + +DEPENDENCIES = [] REQUIREMENTS = ['paho-mqtt>=1.1'] +CONF_BROKER = 'broker' +CONF_PORT = 'port' +CONF_TOPIC = 'topic' +CONF_KEEPALIVE = 'keepalive' +CONF_QOS = 'qos' + +ATTR_QOS = 'qos' +ATTR_TOPIC = 'topic' ATTR_SUBTOPIC = 'subtopic' ATTR_PAYLOAD = 'payload' -def send_message(hass, subtopic, payload): +def publish(hass, payload, subtopic=None): """ Send an MQTT message. """ - hass.services.call(DOMAIN, MQTT_SEND, {ATTR_SUBTOPIC: subtopic, - ATTR_PAYLOAD: payload}) + data = {ATTR_PAYLOAD: payload} + if subtopic is not None: + data[ATTR_SUBTOPIC] = subtopic + hass.services.call(DOMAIN, SERVICE_PUBLISH, data) def setup(hass, config): """ Get the MQTT protocol service. """ - if not validate_config(config, - {DOMAIN: ['broker', - 'topic']}, - _LOGGER): + if not validate_config(config, {DOMAIN: ['broker']}, _LOGGER): return False - if mqtt is None: - _LOGGER.error("Error while importing dependency 'paho-mqtt'.") - return False + conf = config[DOMAIN] + + broker = conf[CONF_BROKER] + port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT) + topic = util.convert(conf.get(CONF_TOPIC), str, DEFAULT_TOPIC) + keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE) + qos = util.convert(conf.get(CONF_QOS), int, DEFAULT_QOS) global MQTT_CLIENT - - broker = config[DOMAIN]['broker'] - port = config[DOMAIN].get('port', MQTT_DEFAULT_PORT) - topic = config[DOMAIN]['topic'] - keepalive = config[DOMAIN].get('keepalive', MQTT_DEFAULT_KEEPALIVE) - qos = config[DOMAIN].get('qos', MQTT_DEFAULT_QOS) - - MQTT_CLIENT = MQTT(hass, broker, port, topic, keepalive, qos, - MQTT_CLIENT_ID) + try: + MQTT_CLIENT = MQTT(hass, broker, port, keepalive, qos) + except socket.error: + _LOGGER.exception("Can't connect to the broker. " + "Please check your settings and the broker " + "itself.") + return False def stop_mqtt(event): """ Stop MQTT component. """ @@ -105,29 +115,23 @@ def setup(hass, config): def start_mqtt(event): """ Launch MQTT component when Home Assistant starts up. """ - try: - MQTT_CLIENT.run() - - except ConnectionRefusedError: - _LOGGER.exception("Can't connect to the broker. " - "Please check your settings and the broker" - "itself.") - return False - + MQTT_CLIENT.subscribe('{}/#'.format(topic)) + MQTT_CLIENT.start() hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt) - def mqtt_message(call): - """ Handle sending MQTT message service calls. """ - subtopic = call.data.get(ATTR_SUBTOPIC) - complete_topic = '{}/{}'.format(str(topic), str(subtopic)) - + def publish_service(call): + """ Handle MQTT publish service calls. """ payload = call.data.get(ATTR_PAYLOAD) + if payload is None: + return + subtopic = call.data.get(ATTR_SUBTOPIC) + msg_topic = '{}/{}'.format(topic, subtopic) if subtopic else topic - MQTT_CLIENT.publish(complete_topic, payload=payload) + MQTT_CLIENT.publish(msg_topic, payload=payload) hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt) - hass.services.register(DOMAIN, MQTT_SEND, mqtt_message) + hass.services.register(DOMAIN, SERVICE_PUBLISH, publish_service) return True @@ -137,60 +141,44 @@ def setup(hass, config): # pylint: disable=too-many-arguments, invalid-name class MQTT(object): """ Implements messaging service for MQTT. """ - def __init__(self, hass, broker, port, topic, keepalive, qos, clientid): + def __init__(self, hass, broker, port, keepalive, qos): + import paho.mqtt.client as mqtt self.hass = hass - self._broker = broker - self._port = port - self._topic = topic - self._keepalive = keepalive self._qos = qos - self._mqttc = mqtt.Client(clientid) + self._mqttc = mqtt.Client() self._mqttc.on_message = self.mqtt_on_message - self._mqttc.on_connect = self.mqtt_on_connect - self._mqttc.on_publish = self.mqtt_on_publish - self._mqttc.on_subscribe = self.mqtt_on_subscribe - - def mqtt_on_connect(self, mqttc, obj, flags, rc): - """ Connect callback """ - _LOGGER.info('Connected to broker %s', self._broker) - - def mqtt_on_publish(self, mqttc, obj, mid): - """ Publish callback """ - pass - - def mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos): - """ Subscribe callback """ - complete_topic = '{}/#'.format(self._topic) - _LOGGER.info('Subscribed to %s', complete_topic) + self._mqttc.connect(broker, port, keepalive) def mqtt_on_message(self, mqttc, obj, msg): """ Message callback """ + if '/' in msg.topic: + msg_topic, msg_subtopic = msg.topic.split('/', 1) + else: + msg_topic, msg_subtopic = msg.topic, '' + self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, { - 'topic': msg.topic, - 'qos': str(msg.qos), - 'payload': msg.payload.decode('utf-8'), + ATTR_TOPIC: msg_topic, + ATTR_SUBTOPIC: msg_subtopic, + ATTR_QOS: msg.qos, + ATTR_PAYLOAD: msg.payload.decode('utf-8'), }) def subscribe(self, topic): """ Subscribe to a topic. """ - self._mqttc.subscribe(self._topic, qos=self._qos) + self._mqttc.subscribe(topic, qos=self._qos) def unsubscribe(self, topic): """ Unsubscribe from topic. """ self._mqttc.unsubscribe(topic) def stop(self): - """ Stop the MWTT client. """ + """ Stop the MQTT client. """ self._mqttc.loop_stop() - def run(self): + def start(self): """ Run the MQTT client. """ - self._mqttc.connect(self._broker, - int(self._port), - int(self._keepalive)) - self._mqttc.subscribe('{}/#'.format(self._topic), qos=self._qos) self._mqttc.loop_start() def publish(self, topic, payload):