Service validation for MQTT component.

* Service validation for MQTT component.

* Fixes for failing mqtt tests.

- Changed behaviour when both 'payload' and 'payload_template' are provided
  instead of silently ignoring the payload_template value.
- Have validation accept any type of payload and leave encoding to paho-mqtt.
This commit is contained in:
Jan Harkes 2016-04-02 11:46:09 -04:00 committed by Paulus Schoutsen
parent ebd053824d
commit 5c753f8ffd
2 changed files with 32 additions and 24 deletions

View File

@ -9,12 +9,14 @@ import os
import socket
import time
import voluptuous as vol
from homeassistant.bootstrap import prepare_setup_platform
from homeassistant.config import load_yaml_config_file
from homeassistant.exceptions import HomeAssistantError
import homeassistant.util as util
from homeassistant.helpers import template
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP)
@ -57,6 +59,22 @@ ATTR_RETAIN = 'retain'
MAX_RECONNECT_WAIT = 300 # seconds
# Service call validation schema
def mqtt_topic(value):
"""Validate that we can publish using this MQTT topic."""
if isinstance(value, str) and all(c not in value for c in '#+\0'):
return vol.Length(min=1, max=65535)(value)
raise vol.Invalid('Invalid MQTT topic name')
MQTT_PUBLISH_SCHEMA = vol.Schema({
vol.Required(ATTR_TOPIC): mqtt_topic,
vol.Exclusive(ATTR_PAYLOAD, 'payload'): object,
vol.Exclusive(ATTR_PAYLOAD_TEMPLATE, 'payload'): cv.string,
vol.Required(ATTR_QOS, default=DEFAULT_QOS): vol.In([0, 1, 2]),
vol.Required(ATTR_RETAIN, default=DEFAULT_RETAIN): vol.Coerce(bool),
})
def _build_publish_data(topic, qos, retain):
"""Build the arguments for the publish service without the payload."""
data = {ATTR_TOPIC: topic}
@ -170,26 +188,18 @@ def setup(hass, config):
def publish_service(call):
"""Handle MQTT publish service calls."""
msg_topic = call.data.get(ATTR_TOPIC)
msg_topic = call.data[ATTR_TOPIC]
payload = call.data.get(ATTR_PAYLOAD)
payload_template = call.data.get(ATTR_PAYLOAD_TEMPLATE)
qos = call.data.get(ATTR_QOS, DEFAULT_QOS)
retain = call.data.get(ATTR_RETAIN, DEFAULT_RETAIN)
if payload is None:
if payload_template is None:
_LOGGER.error(
"You must set either '%s' or '%s' to use this service",
ATTR_PAYLOAD, ATTR_PAYLOAD_TEMPLATE)
return
try:
payload = template.render(hass, payload_template)
except template.jinja2.TemplateError as exc:
_LOGGER.error(
"Unable to publish to '%s': rendering payload template of "
"'%s' failed because %s.",
msg_topic, payload_template, exc)
return
if msg_topic is None or payload is None:
qos = call.data[ATTR_QOS]
retain = call.data[ATTR_RETAIN]
try:
payload = payload or template.render(hass, payload_template)
except template.jinja2.TemplateError as exc:
_LOGGER.error(
"Unable to publish to '%s': rendering payload template of "
"'%s' failed because %s.",
msg_topic, payload_template, exc)
return
MQTT_CLIENT.publish(msg_topic, payload, qos, retain)
@ -199,7 +209,8 @@ def setup(hass, config):
os.path.join(os.path.dirname(__file__), 'services.yaml'))
hass.services.register(DOMAIN, SERVICE_PUBLISH, publish_service,
descriptions.get(SERVICE_PUBLISH))
descriptions.get(SERVICE_PUBLISH),
schema=MQTT_PUBLISH_SCHEMA)
return True

View File

@ -90,19 +90,16 @@ class TestMQTT(unittest.TestCase):
def test_service_call_with_payload_doesnt_render_template(self):
"""Test the service call with unrendered template.
If a 'payload' is provided then use that instead of 'payload_template'.
If both 'payload' and 'payload_template' are provided then fail.
"""
payload = "not a template"
payload_template = "a template"
# Call the service directly because the helper functions don't allow
# you to provide payload AND payload_template.
self.hass.services.call(mqtt.DOMAIN, mqtt.SERVICE_PUBLISH, {
mqtt.ATTR_TOPIC: "test/topic",
mqtt.ATTR_PAYLOAD: payload,
mqtt.ATTR_PAYLOAD_TEMPLATE: payload_template
}, blocking=True)
self.assertTrue(mqtt.MQTT_CLIENT.publish.called)
self.assertEqual(mqtt.MQTT_CLIENT.publish.call_args[0][1], payload)
self.assertFalse(mqtt.MQTT_CLIENT.publish.called)
def test_service_call_without_payload_or_payload_template(self):
"""Test the service call without payload or payload template.