mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 08:47:57 +00:00
Create mqtt eventstream component
This commit is contained in:
parent
31fcd230b1
commit
8ace656657
95
homeassistant/components/mqtt_eventstream.py
Normal file
95
homeassistant/components/mqtt_eventstream.py
Normal file
@ -0,0 +1,95 @@
|
||||
"""
|
||||
homeassistant.components.mqtt_eventstream
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Connect two Home Assistant instances via mqtt.
|
||||
|
||||
Configuration:
|
||||
|
||||
To use the mqtt_eventstream component you will need to add the following to
|
||||
your configuration.yaml file.
|
||||
|
||||
If you do not specify a publish_topic you will not forward events to the queue.
|
||||
If you do not specify a subscribe_topic then you will not receive events from
|
||||
the remote server.
|
||||
|
||||
mqtt_eventstream:
|
||||
publish_topic: MyServerName
|
||||
subscribe_topic: OtherHaServerName
|
||||
"""
|
||||
import json
|
||||
from homeassistant.core import EventOrigin, State
|
||||
from homeassistant.const import (
|
||||
MATCH_ALL,
|
||||
EVENT_TIME_CHANGED,
|
||||
EVENT_CALL_SERVICE,
|
||||
EVENT_SERVICE_EXECUTED,
|
||||
EVENT_STATE_CHANGED,
|
||||
)
|
||||
|
||||
import homeassistant.loader as loader
|
||||
from homeassistant.remote import JSONEncoder
|
||||
|
||||
# The domain of your component. Should be equal to the name of your component
|
||||
DOMAIN = "mqtt_eventstream"
|
||||
|
||||
# List of component names (string) your component depends upon
|
||||
DEPENDENCIES = ['mqtt']
|
||||
|
||||
|
||||
def setup(hass, config):
|
||||
""" Setup our mqtt_eventstream component. """
|
||||
def _event_handler(event):
|
||||
""" Handle events by publishing them on the mqtt queue. """
|
||||
if event.origin != EventOrigin.local:
|
||||
return
|
||||
if event.event_type in (
|
||||
EVENT_TIME_CHANGED,
|
||||
EVENT_CALL_SERVICE,
|
||||
EVENT_SERVICE_EXECUTED
|
||||
):
|
||||
return
|
||||
event = {'event_type': event.event_type, 'event_data': event.data}
|
||||
msg = json.dumps(event, cls=JSONEncoder)
|
||||
mqtt.publish(hass, pub_topic, msg)
|
||||
|
||||
mqtt = loader.get_component('mqtt')
|
||||
pub_topic = config[DOMAIN].get('publish_topic', None)
|
||||
sub_topic = config[DOMAIN].get('subscribe_topic', None)
|
||||
|
||||
# Only listen for local events if you are going to publish them
|
||||
if (pub_topic):
|
||||
hass.bus.listen(MATCH_ALL, _event_handler)
|
||||
|
||||
# Process events from a remote server that are received on a queue
|
||||
def _event_receiver(topic, payload, qos):
|
||||
"""
|
||||
A new MQTT message, published by the other HA instance,
|
||||
has been received.
|
||||
"""
|
||||
# TODO error handling
|
||||
event = json.loads(payload)
|
||||
event_type = event.get('event_type')
|
||||
event_data = event.get('event_data')
|
||||
|
||||
# Special case handling for event STATE_CHANGED
|
||||
# We will try to convert state dicts back to State objects
|
||||
if event_type == EVENT_STATE_CHANGED and event_data:
|
||||
for key in ('old_state', 'new_state'):
|
||||
state = State.from_dict(event_data.get(key))
|
||||
|
||||
if state:
|
||||
event_data[key] = state
|
||||
|
||||
hass.bus.fire(
|
||||
event_type,
|
||||
event_data=event_data,
|
||||
origin=EventOrigin.remote
|
||||
)
|
||||
|
||||
# Only subscribe if you specified a topic
|
||||
if (sub_topic):
|
||||
mqtt.subscribe(hass, sub_topic, _event_receiver)
|
||||
|
||||
hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True)
|
||||
# return boolean to indicate that initialization was successful
|
||||
return True
|
139
tests/components/test_mqtt_eventstream.py
Normal file
139
tests/components/test_mqtt_eventstream.py
Normal file
@ -0,0 +1,139 @@
|
||||
"""
|
||||
tests.test_component_mqtt_eventstream
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Tests MQTT eventstream component.
|
||||
"""
|
||||
import json
|
||||
import unittest
|
||||
from unittest.mock import ANY, patch
|
||||
|
||||
import homeassistant.components.mqtt_eventstream as eventstream
|
||||
from homeassistant.const import EVENT_STATE_CHANGED
|
||||
from homeassistant.core import State
|
||||
from homeassistant.remote import JSONEncoder
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from tests.common import (
|
||||
get_test_home_assistant,
|
||||
mock_mqtt_component,
|
||||
fire_mqtt_message,
|
||||
mock_state_change_event,
|
||||
fire_time_changed
|
||||
)
|
||||
|
||||
|
||||
class TestMqttEventStream(unittest.TestCase):
|
||||
""" Test the MQTT eventstream module. """
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
super(TestMqttEventStream, self).setUp()
|
||||
self.hass = get_test_home_assistant()
|
||||
self.mock_mqtt = mock_mqtt_component(self.hass)
|
||||
|
||||
def tearDown(self): # pylint: disable=invalid-name
|
||||
""" Stop down stuff we started. """
|
||||
self.hass.stop()
|
||||
|
||||
def add_eventstream(self, sub_topic=None, pub_topic=None):
|
||||
""" Add a mqtt_eventstream component to the hass. """
|
||||
config = {}
|
||||
if sub_topic:
|
||||
config['subscribe_topic'] = sub_topic
|
||||
if pub_topic:
|
||||
config['publish_topic'] = pub_topic
|
||||
return eventstream.setup(self.hass, {eventstream.DOMAIN: config})
|
||||
|
||||
def test_setup_succeeds(self):
|
||||
self.assertTrue(self.add_eventstream())
|
||||
|
||||
def test_setup_with_pub(self):
|
||||
# Should start off with no listeners for all events
|
||||
self.assertEqual(self.hass.bus.listeners.get('*'), None)
|
||||
|
||||
self.assertTrue(self.add_eventstream(pub_topic='bar'))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
# Verify that the event handler has been added as a listener
|
||||
self.assertEqual(self.hass.bus.listeners.get('*'), 1)
|
||||
|
||||
@patch('homeassistant.components.mqtt.subscribe')
|
||||
def test_subscribe(self, mock_sub):
|
||||
sub_topic = 'foo'
|
||||
self.assertTrue(self.add_eventstream(sub_topic=sub_topic))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
# Verify that the this entity was subscribed to the topic
|
||||
mock_sub.assert_called_with(self.hass, sub_topic, ANY)
|
||||
|
||||
@patch('homeassistant.components.mqtt.publish')
|
||||
@patch('homeassistant.core.dt_util.datetime_to_str')
|
||||
def test_state_changed_event_sends_message(self, mock_datetime, mock_pub):
|
||||
now = '00:19:19 11-01-2016'
|
||||
e_id = 'fake.entity'
|
||||
pub_topic = 'bar'
|
||||
mock_datetime.return_value = now
|
||||
|
||||
# Add the eventstream component for publishing events
|
||||
self.assertTrue(self.add_eventstream(pub_topic=pub_topic))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
# Reset the mock because it will have already gotten calls for the
|
||||
# mqtt_eventstream state change on initialization, etc.
|
||||
mock_pub.reset_mock()
|
||||
|
||||
# Set a state of an entity
|
||||
mock_state_change_event(self.hass, State(e_id, 'on'))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
# The order of the JSON is indeterminate,
|
||||
# so first just check that publish was called
|
||||
mock_pub.assert_called_with(self.hass, pub_topic, ANY)
|
||||
self.assertTrue(mock_pub.called)
|
||||
|
||||
# Get the actual call to publish and make sure it was the one
|
||||
# we were looking for
|
||||
msg = mock_pub.call_args[0][2]
|
||||
event = {}
|
||||
event['event_type'] = EVENT_STATE_CHANGED
|
||||
new_state = {
|
||||
"last_updated": now,
|
||||
"state": "on",
|
||||
"entity_id": e_id,
|
||||
"attributes": {},
|
||||
"last_changed": now
|
||||
}
|
||||
event['event_data'] = {"new_state": new_state, "entity_id": e_id}
|
||||
|
||||
# Verify that the message received was that expected
|
||||
self.assertEqual(json.loads(msg), event)
|
||||
|
||||
@patch('homeassistant.components.mqtt.publish')
|
||||
def test_time_event_does_not_send_message(self, mock_pub):
|
||||
self.assertTrue(self.add_eventstream(pub_topic='bar'))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
# Reset the mock because it will have already gotten calls for the
|
||||
# mqtt_eventstream state change on initialization, etc.
|
||||
mock_pub.reset_mock()
|
||||
|
||||
fire_time_changed(self.hass, dt_util.utcnow())
|
||||
self.assertFalse(mock_pub.called)
|
||||
|
||||
def test_receiving_remote_event_fires_hass_event(self):
|
||||
sub_topic = 'foo'
|
||||
self.assertTrue(self.add_eventstream(sub_topic=sub_topic))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
calls = []
|
||||
self.hass.bus.listen_once('test_event', lambda _: calls.append(1))
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
payload = json.dumps(
|
||||
{'event_type': 'test_event', 'event_data': {}},
|
||||
cls=JSONEncoder
|
||||
)
|
||||
fire_mqtt_message(self.hass, sub_topic, payload)
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
self.assertEqual(1, len(calls))
|
Loading…
x
Reference in New Issue
Block a user