diff --git a/.gitignore b/.gitignore index 8dab1d873da..658ad279292 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ homeassistant/components/frontend/www_static/polymer/bower_components/* config/custom_components/* !config/custom_components/example.py !config/custom_components/hello_world.py +!config/custom_components/mqtt_example.py # Hide sublime text stuff *.sublime-project diff --git a/config/custom_components/mqtt_example.py b/config/custom_components/mqtt_example.py new file mode 100644 index 00000000000..5b54226cb7c --- /dev/null +++ b/config/custom_components/mqtt_example.py @@ -0,0 +1,60 @@ +""" +custom_components.mqtt_example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Shows how to communicate with MQTT. Follows a topic on MQTT and updates the +state of an entity to the last message received on that topic. + +Also offers a service 'set_state' that will publish a message on the topic that +will be passed via MQTT to our message received listener. Call the service with +example payload {"new_state": "some new state"}. + +Configuration: + +To use the mqtt_example component you will need to add the following to your +config/configuration.yaml + +mqtt_example: + topic: home-assistant/mqtt_example + +""" +import homeassistant.loader as loader + +# The domain of your component. Should be equal to the name of your component +DOMAIN = "mqtt_example" + +# List of component names (string) your component depends upon +DEPENDENCIES = ['mqtt'] + + +CONF_TOPIC = 'topic' +DEFAULT_TOPIC = 'home-assistant/mqtt_example' + + +def setup(hass, config): + """ Setup our mqtt_example component. """ + mqtt = loader.get_component('mqtt') + topic = config[DOMAIN].get('topic', DEFAULT_TOPIC) + entity_id = 'mqtt_example.last_message' + + # Listen to a message on MQTT + + def message_received(topic, payload, qos): + """ A new MQTT message has been received. """ + hass.states.set(entity_id, payload) + + mqtt.subscribe(hass, topic, message_received) + + hass.states.set(entity_id, 'No messages') + + # Service to publish a message on MQTT + + def set_state_service(call): + """ Service to send a message. """ + mqtt.publish(hass, topic, call.data.get('new_state')) + + # Register our service with Home Assistant + hass.services.register(DOMAIN, 'set_state', set_state_service) + + # return boolean to indicate that initialization was successful + return True diff --git a/homeassistant/components/mqtt.py b/homeassistant/components/mqtt.py index 195360c9b03..b7882bdbcc2 100644 --- a/homeassistant/components/mqtt.py +++ b/homeassistant/components/mqtt.py @@ -11,12 +11,16 @@ Configuration: To use MQTT you will need to add something like the following to your config/configuration.yaml. +mqtt: + broker: 127.0.0.1 + +Or, if you want more options: + mqtt: broker: 127.0.0.1 port: 1883 - topic: home-assistant + client_id: home-assistant-1 keepalive: 60 - qos: 0 Variables: @@ -28,18 +32,14 @@ port *Optional The network port to connect to. Default is 1883. -topic +client_id *Optional -The MQTT topic to subscribe to. Default is home-assistant. +Client ID that Home Assistant will use. Has to be unique on the server. +Default is a random generated one. keepalive *Optional -The keep alive in seconds for this client, e.g. 60. - -qos -*Optional -Quality of service level to use for the subscription. -0, 1, or 2, defaults to 0. +The keep alive in seconds for this client. Default is 60. """ import logging import socket @@ -55,7 +55,6 @@ DOMAIN = "mqtt" MQTT_CLIENT = None -DEFAULT_TOPIC = 'home-assistant' DEFAULT_PORT = 1883 DEFAULT_KEEPALIVE = 60 DEFAULT_QOS = 0 @@ -68,24 +67,36 @@ REQUIREMENTS = ['paho-mqtt>=1.1'] CONF_BROKER = 'broker' CONF_PORT = 'port' -CONF_TOPIC = 'topic' +CONF_CLIENT_ID = 'client_id' CONF_KEEPALIVE = 'keepalive' -CONF_QOS = 'qos' ATTR_QOS = 'qos' ATTR_TOPIC = 'topic' -ATTR_SUBTOPIC = 'subtopic' ATTR_PAYLOAD = 'payload' -def publish(hass, payload, subtopic=None): +def publish(hass, topic, payload): """ Send an MQTT message. """ - data = {ATTR_PAYLOAD: payload} - if subtopic is not None: - data[ATTR_SUBTOPIC] = subtopic + data = { + ATTR_TOPIC: topic, + ATTR_PAYLOAD: payload, + } hass.services.call(DOMAIN, SERVICE_PUBLISH, data) +def subscribe(hass, topic, callback, qos=0): + """ Subscribe to a topic. """ + def mqtt_topic_subscriber(event): + """ Subscribes to a specific MQTT topic. """ + if event.data[ATTR_TOPIC] == topic: + callback(topic, event.data[ATTR_PAYLOAD], event.data[ATTR_QOS]) + + hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, mqtt_topic_subscriber) + + if topic not in MQTT_CLIENT.topics: + MQTT_CLIENT.subscribe(topic, qos) + + def setup(hass, config): """ Get the MQTT protocol service. """ @@ -96,13 +107,12 @@ def setup(hass, config): broker = conf[CONF_BROKER] port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT) - topic = util.convert(conf.get(CONF_TOPIC), str, DEFAULT_TOPIC) + client_id = util.convert(conf.get(CONF_CLIENT_ID), str) keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE) - qos = util.convert(conf.get(CONF_QOS), int, DEFAULT_QOS) global MQTT_CLIENT try: - MQTT_CLIENT = MQTT(hass, broker, port, keepalive, qos) + MQTT_CLIENT = MQTT(hass, broker, port, client_id, keepalive) except socket.error: _LOGGER.exception("Can't connect to the broker. " "Please check your settings and the broker " @@ -115,19 +125,16 @@ def setup(hass, config): def start_mqtt(event): """ Launch MQTT component when Home Assistant starts up. """ - MQTT_CLIENT.subscribe('{}/#'.format(topic)) MQTT_CLIENT.start() hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt) def publish_service(call): """ Handle MQTT publish service calls. """ + msg_topic = call.data.get(ATTR_TOPIC) payload = call.data.get(ATTR_PAYLOAD) - if payload is None: + if msg_topic is None or payload is None: return - subtopic = call.data.get(ATTR_SUBTOPIC) - msg_topic = '{}/{}'.format(topic, subtopic) if subtopic else topic - - MQTT_CLIENT.publish(msg_topic, payload=payload) + MQTT_CLIENT.publish(msg_topic, payload) hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt) @@ -138,49 +145,78 @@ def setup(hass, config): # This is based on one of the paho-mqtt examples: # http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/tree/examples/sub-class.py -# pylint: disable=too-many-arguments, invalid-name class MQTT(object): """ Implements messaging service for MQTT. """ - def __init__(self, hass, broker, port, keepalive, qos): + def __init__(self, hass, broker, port, client_id, keepalive): import paho.mqtt.client as mqtt self.hass = hass - self._qos = qos + self._progress = {} + self.topics = {} - self._mqttc = mqtt.Client() - self._mqttc.on_message = self.mqtt_on_message + if client_id is None: + self._mqttc = mqtt.Client() + else: + self._mqttc = mqtt.Client(client_id) + self._mqttc.on_subscribe = self._mqtt_on_subscribe + self._mqttc.on_unsubscribe = self._mqtt_on_unsubscribe + self._mqttc.on_connect = self._mqtt_on_connect + self._mqttc.on_message = self._mqtt_on_message self._mqttc.connect(broker, port, keepalive) - def mqtt_on_message(self, mqttc, obj, msg): - """ Message callback """ - if '/' in msg.topic: - msg_topic, msg_subtopic = msg.topic.split('/', 1) - else: - msg_topic, msg_subtopic = msg.topic, '' - - self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, { - ATTR_TOPIC: msg_topic, - ATTR_SUBTOPIC: msg_subtopic, - ATTR_QOS: msg.qos, - ATTR_PAYLOAD: msg.payload.decode('utf-8'), - }) - - def subscribe(self, topic): - """ Subscribe to a topic. """ - self._mqttc.subscribe(topic, qos=self._qos) + def publish(self, topic, payload): + """ Publish a MQTT message. """ + self._mqttc.publish(topic, payload) def unsubscribe(self, topic): """ Unsubscribe from topic. """ - self._mqttc.unsubscribe(topic) - - def stop(self): - """ Stop the MQTT client. """ - self._mqttc.loop_stop() + result, mid = self._mqttc.unsubscribe(topic) + self._progress[mid] = topic def start(self): """ Run the MQTT client. """ self._mqttc.loop_start() - def publish(self, topic, payload): - """ Publish a MQTT message. """ - self._mqttc.publish(topic, payload) + def stop(self): + """ Stop the MQTT client. """ + self._mqttc.loop_stop() + + def subscribe(self, topic, qos): + """ Subscribe to a topic. """ + if topic in self.topics: + return + result, mid = self._mqttc.subscribe(topic, qos) + self._progress[mid] = topic + self.topics[topic] = None + + def _mqtt_on_connect(self, mqttc, obj, flags, rc): + """ On connect, resubscribe to all topics we were subscribed to. """ + old_topics = self.topics + self._progress = {} + self.topics = {} + for topic, qos in old_topics.items(): + # qos is None if we were in process of subscribing + if qos is not None: + self._mqttc.subscribe(topic, qos) + + def _mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos): + """ Called when subscribe succesfull. """ + topic = self._progress.pop(mid, None) + if topic is None: + return + self.topics[topic] = granted_qos + + def _mqtt_on_unsubscribe(self, mqttc, obj, mid, granted_qos): + """ Called when subscribe succesfull. """ + topic = self._progress.pop(mid, None) + if topic is None: + return + self.topics.pop(topic, None) + + def _mqtt_on_message(self, mqttc, obj, msg): + """ Message callback """ + self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, { + ATTR_TOPIC: msg.topic, + ATTR_QOS: msg.qos, + ATTR_PAYLOAD: msg.payload.decode('utf-8'), + })