Update MQTT component and add example

This commit is contained in:
Paulus Schoutsen 2015-08-09 11:29:50 -07:00
parent a2c6dbf479
commit c8b54d7468
3 changed files with 154 additions and 57 deletions

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ homeassistant/components/frontend/www_static/polymer/bower_components/*
config/custom_components/* config/custom_components/*
!config/custom_components/example.py !config/custom_components/example.py
!config/custom_components/hello_world.py !config/custom_components/hello_world.py
!config/custom_components/mqtt_example.py
# Hide sublime text stuff # Hide sublime text stuff
*.sublime-project *.sublime-project

View File

@ -0,0 +1,60 @@
"""
custom_components.mqtt_example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Shows how to communicate with MQTT. Follows a topic on MQTT and updates the
state of an entity to the last message received on that topic.
Also offers a service 'set_state' that will publish a message on the topic that
will be passed via MQTT to our message received listener. Call the service with
example payload {"new_state": "some new state"}.
Configuration:
To use the mqtt_example component you will need to add the following to your
config/configuration.yaml
mqtt_example:
topic: home-assistant/mqtt_example
"""
import homeassistant.loader as loader
# The domain of your component. Should be equal to the name of your component
DOMAIN = "mqtt_example"
# List of component names (string) your component depends upon
DEPENDENCIES = ['mqtt']
CONF_TOPIC = 'topic'
DEFAULT_TOPIC = 'home-assistant/mqtt_example'
def setup(hass, config):
""" Setup our mqtt_example component. """
mqtt = loader.get_component('mqtt')
topic = config[DOMAIN].get('topic', DEFAULT_TOPIC)
entity_id = 'mqtt_example.last_message'
# Listen to a message on MQTT
def message_received(topic, payload, qos):
""" A new MQTT message has been received. """
hass.states.set(entity_id, payload)
mqtt.subscribe(hass, topic, message_received)
hass.states.set(entity_id, 'No messages')
# Service to publish a message on MQTT
def set_state_service(call):
""" Service to send a message. """
mqtt.publish(hass, topic, call.data.get('new_state'))
# Register our service with Home Assistant
hass.services.register(DOMAIN, 'set_state', set_state_service)
# return boolean to indicate that initialization was successful
return True

View File

@ -11,12 +11,16 @@ Configuration:
To use MQTT you will need to add something like the following to your To use MQTT you will need to add something like the following to your
config/configuration.yaml. config/configuration.yaml.
mqtt:
broker: 127.0.0.1
Or, if you want more options:
mqtt: mqtt:
broker: 127.0.0.1 broker: 127.0.0.1
port: 1883 port: 1883
topic: home-assistant client_id: home-assistant-1
keepalive: 60 keepalive: 60
qos: 0
Variables: Variables:
@ -28,18 +32,14 @@ port
*Optional *Optional
The network port to connect to. Default is 1883. The network port to connect to. Default is 1883.
topic client_id
*Optional *Optional
The MQTT topic to subscribe to. Default is home-assistant. Client ID that Home Assistant will use. Has to be unique on the server.
Default is a random generated one.
keepalive keepalive
*Optional *Optional
The keep alive in seconds for this client, e.g. 60. The keep alive in seconds for this client. Default is 60.
qos
*Optional
Quality of service level to use for the subscription.
0, 1, or 2, defaults to 0.
""" """
import logging import logging
import socket import socket
@ -55,7 +55,6 @@ DOMAIN = "mqtt"
MQTT_CLIENT = None MQTT_CLIENT = None
DEFAULT_TOPIC = 'home-assistant'
DEFAULT_PORT = 1883 DEFAULT_PORT = 1883
DEFAULT_KEEPALIVE = 60 DEFAULT_KEEPALIVE = 60
DEFAULT_QOS = 0 DEFAULT_QOS = 0
@ -68,24 +67,36 @@ REQUIREMENTS = ['paho-mqtt>=1.1']
CONF_BROKER = 'broker' CONF_BROKER = 'broker'
CONF_PORT = 'port' CONF_PORT = 'port'
CONF_TOPIC = 'topic' CONF_CLIENT_ID = 'client_id'
CONF_KEEPALIVE = 'keepalive' CONF_KEEPALIVE = 'keepalive'
CONF_QOS = 'qos'
ATTR_QOS = 'qos' ATTR_QOS = 'qos'
ATTR_TOPIC = 'topic' ATTR_TOPIC = 'topic'
ATTR_SUBTOPIC = 'subtopic'
ATTR_PAYLOAD = 'payload' ATTR_PAYLOAD = 'payload'
def publish(hass, payload, subtopic=None): def publish(hass, topic, payload):
""" Send an MQTT message. """ """ Send an MQTT message. """
data = {ATTR_PAYLOAD: payload} data = {
if subtopic is not None: ATTR_TOPIC: topic,
data[ATTR_SUBTOPIC] = subtopic ATTR_PAYLOAD: payload,
}
hass.services.call(DOMAIN, SERVICE_PUBLISH, data) hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
def subscribe(hass, topic, callback, qos=0):
""" Subscribe to a topic. """
def mqtt_topic_subscriber(event):
""" Subscribes to a specific MQTT topic. """
if event.data[ATTR_TOPIC] == topic:
callback(topic, event.data[ATTR_PAYLOAD], event.data[ATTR_QOS])
hass.bus.listen(EVENT_MQTT_MESSAGE_RECEIVED, mqtt_topic_subscriber)
if topic not in MQTT_CLIENT.topics:
MQTT_CLIENT.subscribe(topic, qos)
def setup(hass, config): def setup(hass, config):
""" Get the MQTT protocol service. """ """ Get the MQTT protocol service. """
@ -96,13 +107,12 @@ def setup(hass, config):
broker = conf[CONF_BROKER] broker = conf[CONF_BROKER]
port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT) port = util.convert(conf.get(CONF_PORT), int, DEFAULT_PORT)
topic = util.convert(conf.get(CONF_TOPIC), str, DEFAULT_TOPIC) client_id = util.convert(conf.get(CONF_CLIENT_ID), str)
keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE) keepalive = util.convert(conf.get(CONF_KEEPALIVE), int, DEFAULT_KEEPALIVE)
qos = util.convert(conf.get(CONF_QOS), int, DEFAULT_QOS)
global MQTT_CLIENT global MQTT_CLIENT
try: try:
MQTT_CLIENT = MQTT(hass, broker, port, keepalive, qos) MQTT_CLIENT = MQTT(hass, broker, port, client_id, keepalive)
except socket.error: except socket.error:
_LOGGER.exception("Can't connect to the broker. " _LOGGER.exception("Can't connect to the broker. "
"Please check your settings and the broker " "Please check your settings and the broker "
@ -115,19 +125,16 @@ def setup(hass, config):
def start_mqtt(event): def start_mqtt(event):
""" Launch MQTT component when Home Assistant starts up. """ """ Launch MQTT component when Home Assistant starts up. """
MQTT_CLIENT.subscribe('{}/#'.format(topic))
MQTT_CLIENT.start() MQTT_CLIENT.start()
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt) hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_mqtt)
def publish_service(call): def publish_service(call):
""" Handle MQTT publish service calls. """ """ Handle MQTT publish service calls. """
msg_topic = call.data.get(ATTR_TOPIC)
payload = call.data.get(ATTR_PAYLOAD) payload = call.data.get(ATTR_PAYLOAD)
if payload is None: if msg_topic is None or payload is None:
return return
subtopic = call.data.get(ATTR_SUBTOPIC) MQTT_CLIENT.publish(msg_topic, payload)
msg_topic = '{}/{}'.format(topic, subtopic) if subtopic else topic
MQTT_CLIENT.publish(msg_topic, payload=payload)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt) hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_mqtt)
@ -138,49 +145,78 @@ def setup(hass, config):
# This is based on one of the paho-mqtt examples: # This is based on one of the paho-mqtt examples:
# http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/tree/examples/sub-class.py # http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git/tree/examples/sub-class.py
# pylint: disable=too-many-arguments, invalid-name
class MQTT(object): class MQTT(object):
""" Implements messaging service for MQTT. """ """ Implements messaging service for MQTT. """
def __init__(self, hass, broker, port, keepalive, qos): def __init__(self, hass, broker, port, client_id, keepalive):
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
self.hass = hass self.hass = hass
self._qos = qos self._progress = {}
self.topics = {}
if client_id is None:
self._mqttc = mqtt.Client() self._mqttc = mqtt.Client()
self._mqttc.on_message = self.mqtt_on_message else:
self._mqttc = mqtt.Client(client_id)
self._mqttc.on_subscribe = self._mqtt_on_subscribe
self._mqttc.on_unsubscribe = self._mqtt_on_unsubscribe
self._mqttc.on_connect = self._mqtt_on_connect
self._mqttc.on_message = self._mqtt_on_message
self._mqttc.connect(broker, port, keepalive) self._mqttc.connect(broker, port, keepalive)
def mqtt_on_message(self, mqttc, obj, msg): def publish(self, topic, payload):
""" Message callback """ """ Publish a MQTT message. """
if '/' in msg.topic: self._mqttc.publish(topic, payload)
msg_topic, msg_subtopic = msg.topic.split('/', 1)
else:
msg_topic, msg_subtopic = msg.topic, ''
self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
ATTR_TOPIC: msg_topic,
ATTR_SUBTOPIC: msg_subtopic,
ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'),
})
def subscribe(self, topic):
""" Subscribe to a topic. """
self._mqttc.subscribe(topic, qos=self._qos)
def unsubscribe(self, topic): def unsubscribe(self, topic):
""" Unsubscribe from topic. """ """ Unsubscribe from topic. """
self._mqttc.unsubscribe(topic) result, mid = self._mqttc.unsubscribe(topic)
self._progress[mid] = topic
def stop(self):
""" Stop the MQTT client. """
self._mqttc.loop_stop()
def start(self): def start(self):
""" Run the MQTT client. """ """ Run the MQTT client. """
self._mqttc.loop_start() self._mqttc.loop_start()
def publish(self, topic, payload): def stop(self):
""" Publish a MQTT message. """ """ Stop the MQTT client. """
self._mqttc.publish(topic, payload) self._mqttc.loop_stop()
def subscribe(self, topic, qos):
""" Subscribe to a topic. """
if topic in self.topics:
return
result, mid = self._mqttc.subscribe(topic, qos)
self._progress[mid] = topic
self.topics[topic] = None
def _mqtt_on_connect(self, mqttc, obj, flags, rc):
""" On connect, resubscribe to all topics we were subscribed to. """
old_topics = self.topics
self._progress = {}
self.topics = {}
for topic, qos in old_topics.items():
# qos is None if we were in process of subscribing
if qos is not None:
self._mqttc.subscribe(topic, qos)
def _mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos):
""" Called when subscribe succesfull. """
topic = self._progress.pop(mid, None)
if topic is None:
return
self.topics[topic] = granted_qos
def _mqtt_on_unsubscribe(self, mqttc, obj, mid, granted_qos):
""" Called when subscribe succesfull. """
topic = self._progress.pop(mid, None)
if topic is None:
return
self.topics.pop(topic, None)
def _mqtt_on_message(self, mqttc, obj, msg):
""" Message callback """
self.hass.bus.fire(EVENT_MQTT_MESSAGE_RECEIVED, {
ATTR_TOPIC: msg.topic,
ATTR_QOS: msg.qos,
ATTR_PAYLOAD: msg.payload.decode('utf-8'),
})