diff --git a/homeassistant/components/mqtt_eventstream.py b/homeassistant/components/mqtt_eventstream.py new file mode 100644 index 00000000000..a90e4b0d42a --- /dev/null +++ b/homeassistant/components/mqtt_eventstream.py @@ -0,0 +1,114 @@ +""" +homeassistant.components.mqtt_eventstream +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Connect two Home Assistant instances via mqtt. + +Configuration: + +To use the mqtt_eventstream component you will need to add the following to +your configuration.yaml file. + +If you do not specify a publish_topic you will not forward events to the queue. +If you do not specify a subscribe_topic then you will not receive events from +the remote server. + +mqtt_eventstream: + publish_topic: MyServerName + subscribe_topic: OtherHaServerName +""" +import json +from homeassistant.core import EventOrigin, State +from homeassistant.components.mqtt import DOMAIN as MQTT_DOMAIN +from homeassistant.components.mqtt import SERVICE_PUBLISH as MQTT_SVC_PUBLISH +from homeassistant.const import ( + MATCH_ALL, + EVENT_TIME_CHANGED, + EVENT_CALL_SERVICE, + EVENT_SERVICE_EXECUTED, + EVENT_STATE_CHANGED, +) +import homeassistant.loader as loader +from homeassistant.remote import JSONEncoder + +# The domain of your component. Should be equal to the name of your component +DOMAIN = "mqtt_eventstream" + +# List of component names (string) your component depends upon +DEPENDENCIES = ['mqtt'] + + +def setup(hass, config): + """ Setup our mqtt_eventstream component. """ + mqtt = loader.get_component('mqtt') + pub_topic = config[DOMAIN].get('publish_topic', None) + sub_topic = config[DOMAIN].get('subscribe_topic', None) + + def _event_publisher(event): + """ Handle events by publishing them on the mqtt queue. """ + if event.origin != EventOrigin.local: + return + if event.event_type == EVENT_TIME_CHANGED: + return + + # Filter out the events that were triggered by publishing + # to the MQTT topic, or you will end up in an infinite loop. + if event.event_type == EVENT_CALL_SERVICE: + if ( + event.data.get('domain') == MQTT_DOMAIN and + event.data.get('service') == MQTT_SVC_PUBLISH and + event.data.get('topic') == pub_topic + ): + return + + # Filter out all the "event service executed" events because they + # are only used internally by core as callbacks for blocking + # during the interval while a service is being executed. + # They will serve no purpose to the external system, + # and thus are unnecessary traffic. + # And at any rate it would cause an infinite loop to publish them + # because publishing to an MQTT topic itself triggers one. + if event.event_type == EVENT_SERVICE_EXECUTED: + return + + event_info = {'event_type': event.event_type, 'event_data': event.data} + msg = json.dumps(event_info, cls=JSONEncoder) + mqtt.publish(hass, pub_topic, msg) + + # Only listen for local events if you are going to publish them + if pub_topic: + hass.bus.listen(MATCH_ALL, _event_publisher) + + # Process events from a remote server that are received on a queue + def _event_receiver(topic, payload, qos): + """ + Receive events published by the other HA instance and fire + them on this hass instance. + """ + event = json.loads(payload) + event_type = event.get('event_type') + event_data = event.get('event_data') + + # Special case handling for event STATE_CHANGED + # We will try to convert state dicts back to State objects + # Copied over from the _handle_api_post_events_event method + # of the api component. + if event_type == EVENT_STATE_CHANGED and event_data: + for key in ('old_state', 'new_state'): + state = State.from_dict(event_data.get(key)) + + if state: + event_data[key] = state + + hass.bus.fire( + event_type, + event_data=event_data, + origin=EventOrigin.remote + ) + + # Only subscribe if you specified a topic + if sub_topic: + mqtt.subscribe(hass, sub_topic, _event_receiver) + + hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True) + # return boolean to indicate that initialization was successful + return True diff --git a/tests/components/test_mqtt_eventstream.py b/tests/components/test_mqtt_eventstream.py new file mode 100644 index 00000000000..5e1680ad2a4 --- /dev/null +++ b/tests/components/test_mqtt_eventstream.py @@ -0,0 +1,139 @@ +""" +tests.test_component_mqtt_eventstream +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Tests MQTT eventstream component. +""" +import json +import unittest +from unittest.mock import ANY, patch + +import homeassistant.components.mqtt_eventstream as eventstream +from homeassistant.const import EVENT_STATE_CHANGED +from homeassistant.core import State +from homeassistant.remote import JSONEncoder +import homeassistant.util.dt as dt_util + +from tests.common import ( + get_test_home_assistant, + mock_mqtt_component, + fire_mqtt_message, + mock_state_change_event, + fire_time_changed +) + + +class TestMqttEventStream(unittest.TestCase): + """ Test the MQTT eventstream module. """ + + def setUp(self): # pylint: disable=invalid-name + super(TestMqttEventStream, self).setUp() + self.hass = get_test_home_assistant() + self.mock_mqtt = mock_mqtt_component(self.hass) + + def tearDown(self): # pylint: disable=invalid-name + """ Stop down stuff we started. """ + self.hass.stop() + + def add_eventstream(self, sub_topic=None, pub_topic=None): + """ Add a mqtt_eventstream component to the hass. """ + config = {} + if sub_topic: + config['subscribe_topic'] = sub_topic + if pub_topic: + config['publish_topic'] = pub_topic + return eventstream.setup(self.hass, {eventstream.DOMAIN: config}) + + def test_setup_succeeds(self): + self.assertTrue(self.add_eventstream()) + + def test_setup_with_pub(self): + # Should start off with no listeners for all events + self.assertEqual(self.hass.bus.listeners.get('*'), None) + + self.assertTrue(self.add_eventstream(pub_topic='bar')) + self.hass.pool.block_till_done() + + # Verify that the event handler has been added as a listener + self.assertEqual(self.hass.bus.listeners.get('*'), 1) + + @patch('homeassistant.components.mqtt.subscribe') + def test_subscribe(self, mock_sub): + sub_topic = 'foo' + self.assertTrue(self.add_eventstream(sub_topic=sub_topic)) + self.hass.pool.block_till_done() + + # Verify that the this entity was subscribed to the topic + mock_sub.assert_called_with(self.hass, sub_topic, ANY) + + @patch('homeassistant.components.mqtt.publish') + @patch('homeassistant.core.dt_util.datetime_to_str') + def test_state_changed_event_sends_message(self, mock_datetime, mock_pub): + now = '00:19:19 11-01-2016' + e_id = 'fake.entity' + pub_topic = 'bar' + mock_datetime.return_value = now + + # Add the eventstream component for publishing events + self.assertTrue(self.add_eventstream(pub_topic=pub_topic)) + self.hass.pool.block_till_done() + + # Reset the mock because it will have already gotten calls for the + # mqtt_eventstream state change on initialization, etc. + mock_pub.reset_mock() + + # Set a state of an entity + mock_state_change_event(self.hass, State(e_id, 'on')) + self.hass.pool.block_till_done() + + # The order of the JSON is indeterminate, + # so first just check that publish was called + mock_pub.assert_called_with(self.hass, pub_topic, ANY) + self.assertTrue(mock_pub.called) + + # Get the actual call to publish and make sure it was the one + # we were looking for + msg = mock_pub.call_args[0][2] + event = {} + event['event_type'] = EVENT_STATE_CHANGED + new_state = { + "last_updated": now, + "state": "on", + "entity_id": e_id, + "attributes": {}, + "last_changed": now + } + event['event_data'] = {"new_state": new_state, "entity_id": e_id} + + # Verify that the message received was that expected + self.assertEqual(json.loads(msg), event) + + @patch('homeassistant.components.mqtt.publish') + def test_time_event_does_not_send_message(self, mock_pub): + self.assertTrue(self.add_eventstream(pub_topic='bar')) + self.hass.pool.block_till_done() + + # Reset the mock because it will have already gotten calls for the + # mqtt_eventstream state change on initialization, etc. + mock_pub.reset_mock() + + fire_time_changed(self.hass, dt_util.utcnow()) + self.assertFalse(mock_pub.called) + + def test_receiving_remote_event_fires_hass_event(self): + sub_topic = 'foo' + self.assertTrue(self.add_eventstream(sub_topic=sub_topic)) + self.hass.pool.block_till_done() + + calls = [] + self.hass.bus.listen_once('test_event', lambda _: calls.append(1)) + self.hass.pool.block_till_done() + + payload = json.dumps( + {'event_type': 'test_event', 'event_data': {}}, + cls=JSONEncoder + ) + fire_mqtt_message(self.hass, sub_topic, payload) + self.hass.pool.block_till_done() + + self.assertEqual(1, len(calls))