From 8ace6566573df10aa35402ab424b70a19816a8bc Mon Sep 17 00:00:00 2001 From: Moonshot Date: Sat, 9 Jan 2016 23:18:46 -0500 Subject: [PATCH] Create mqtt eventstream component --- homeassistant/components/mqtt_eventstream.py | 95 +++++++++++++ tests/components/test_mqtt_eventstream.py | 139 +++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 homeassistant/components/mqtt_eventstream.py create mode 100644 tests/components/test_mqtt_eventstream.py diff --git a/homeassistant/components/mqtt_eventstream.py b/homeassistant/components/mqtt_eventstream.py new file mode 100644 index 00000000000..e52124182ff --- /dev/null +++ b/homeassistant/components/mqtt_eventstream.py @@ -0,0 +1,95 @@ +""" +homeassistant.components.mqtt_eventstream +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Connect two Home Assistant instances via mqtt. + +Configuration: + +To use the mqtt_eventstream component you will need to add the following to +your configuration.yaml file. + +If you do not specify a publish_topic you will not forward events to the queue. +If you do not specify a subscribe_topic then you will not receive events from +the remote server. + +mqtt_eventstream: + publish_topic: MyServerName + subscribe_topic: OtherHaServerName +""" +import json +from homeassistant.core import EventOrigin, State +from homeassistant.const import ( + MATCH_ALL, + EVENT_TIME_CHANGED, + EVENT_CALL_SERVICE, + EVENT_SERVICE_EXECUTED, + EVENT_STATE_CHANGED, +) + +import homeassistant.loader as loader +from homeassistant.remote import JSONEncoder + +# The domain of your component. Should be equal to the name of your component +DOMAIN = "mqtt_eventstream" + +# List of component names (string) your component depends upon +DEPENDENCIES = ['mqtt'] + + +def setup(hass, config): + """ Setup our mqtt_eventstream component. """ + def _event_handler(event): + """ Handle events by publishing them on the mqtt queue. """ + if event.origin != EventOrigin.local: + return + if event.event_type in ( + EVENT_TIME_CHANGED, + EVENT_CALL_SERVICE, + EVENT_SERVICE_EXECUTED + ): + return + event = {'event_type': event.event_type, 'event_data': event.data} + msg = json.dumps(event, cls=JSONEncoder) + mqtt.publish(hass, pub_topic, msg) + + mqtt = loader.get_component('mqtt') + pub_topic = config[DOMAIN].get('publish_topic', None) + sub_topic = config[DOMAIN].get('subscribe_topic', None) + + # Only listen for local events if you are going to publish them + if (pub_topic): + hass.bus.listen(MATCH_ALL, _event_handler) + + # Process events from a remote server that are received on a queue + def _event_receiver(topic, payload, qos): + """ + A new MQTT message, published by the other HA instance, + has been received. + """ + # TODO error handling + event = json.loads(payload) + event_type = event.get('event_type') + event_data = event.get('event_data') + + # Special case handling for event STATE_CHANGED + # We will try to convert state dicts back to State objects + if event_type == EVENT_STATE_CHANGED and event_data: + for key in ('old_state', 'new_state'): + state = State.from_dict(event_data.get(key)) + + if state: + event_data[key] = state + + hass.bus.fire( + event_type, + event_data=event_data, + origin=EventOrigin.remote + ) + + # Only subscribe if you specified a topic + if (sub_topic): + mqtt.subscribe(hass, sub_topic, _event_receiver) + + hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True) + # return boolean to indicate that initialization was successful + return True diff --git a/tests/components/test_mqtt_eventstream.py b/tests/components/test_mqtt_eventstream.py new file mode 100644 index 00000000000..5e1680ad2a4 --- /dev/null +++ b/tests/components/test_mqtt_eventstream.py @@ -0,0 +1,139 @@ +""" +tests.test_component_mqtt_eventstream +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Tests MQTT eventstream component. +""" +import json +import unittest +from unittest.mock import ANY, patch + +import homeassistant.components.mqtt_eventstream as eventstream +from homeassistant.const import EVENT_STATE_CHANGED +from homeassistant.core import State +from homeassistant.remote import JSONEncoder +import homeassistant.util.dt as dt_util + +from tests.common import ( + get_test_home_assistant, + mock_mqtt_component, + fire_mqtt_message, + mock_state_change_event, + fire_time_changed +) + + +class TestMqttEventStream(unittest.TestCase): + """ Test the MQTT eventstream module. """ + + def setUp(self): # pylint: disable=invalid-name + super(TestMqttEventStream, self).setUp() + self.hass = get_test_home_assistant() + self.mock_mqtt = mock_mqtt_component(self.hass) + + def tearDown(self): # pylint: disable=invalid-name + """ Stop down stuff we started. """ + self.hass.stop() + + def add_eventstream(self, sub_topic=None, pub_topic=None): + """ Add a mqtt_eventstream component to the hass. """ + config = {} + if sub_topic: + config['subscribe_topic'] = sub_topic + if pub_topic: + config['publish_topic'] = pub_topic + return eventstream.setup(self.hass, {eventstream.DOMAIN: config}) + + def test_setup_succeeds(self): + self.assertTrue(self.add_eventstream()) + + def test_setup_with_pub(self): + # Should start off with no listeners for all events + self.assertEqual(self.hass.bus.listeners.get('*'), None) + + self.assertTrue(self.add_eventstream(pub_topic='bar')) + self.hass.pool.block_till_done() + + # Verify that the event handler has been added as a listener + self.assertEqual(self.hass.bus.listeners.get('*'), 1) + + @patch('homeassistant.components.mqtt.subscribe') + def test_subscribe(self, mock_sub): + sub_topic = 'foo' + self.assertTrue(self.add_eventstream(sub_topic=sub_topic)) + self.hass.pool.block_till_done() + + # Verify that the this entity was subscribed to the topic + mock_sub.assert_called_with(self.hass, sub_topic, ANY) + + @patch('homeassistant.components.mqtt.publish') + @patch('homeassistant.core.dt_util.datetime_to_str') + def test_state_changed_event_sends_message(self, mock_datetime, mock_pub): + now = '00:19:19 11-01-2016' + e_id = 'fake.entity' + pub_topic = 'bar' + mock_datetime.return_value = now + + # Add the eventstream component for publishing events + self.assertTrue(self.add_eventstream(pub_topic=pub_topic)) + self.hass.pool.block_till_done() + + # Reset the mock because it will have already gotten calls for the + # mqtt_eventstream state change on initialization, etc. + mock_pub.reset_mock() + + # Set a state of an entity + mock_state_change_event(self.hass, State(e_id, 'on')) + self.hass.pool.block_till_done() + + # The order of the JSON is indeterminate, + # so first just check that publish was called + mock_pub.assert_called_with(self.hass, pub_topic, ANY) + self.assertTrue(mock_pub.called) + + # Get the actual call to publish and make sure it was the one + # we were looking for + msg = mock_pub.call_args[0][2] + event = {} + event['event_type'] = EVENT_STATE_CHANGED + new_state = { + "last_updated": now, + "state": "on", + "entity_id": e_id, + "attributes": {}, + "last_changed": now + } + event['event_data'] = {"new_state": new_state, "entity_id": e_id} + + # Verify that the message received was that expected + self.assertEqual(json.loads(msg), event) + + @patch('homeassistant.components.mqtt.publish') + def test_time_event_does_not_send_message(self, mock_pub): + self.assertTrue(self.add_eventstream(pub_topic='bar')) + self.hass.pool.block_till_done() + + # Reset the mock because it will have already gotten calls for the + # mqtt_eventstream state change on initialization, etc. + mock_pub.reset_mock() + + fire_time_changed(self.hass, dt_util.utcnow()) + self.assertFalse(mock_pub.called) + + def test_receiving_remote_event_fires_hass_event(self): + sub_topic = 'foo' + self.assertTrue(self.add_eventstream(sub_topic=sub_topic)) + self.hass.pool.block_till_done() + + calls = [] + self.hass.bus.listen_once('test_event', lambda _: calls.append(1)) + self.hass.pool.block_till_done() + + payload = json.dumps( + {'event_type': 'test_event', 'event_data': {}}, + cls=JSONEncoder + ) + fire_mqtt_message(self.hass, sub_topic, payload) + self.hass.pool.block_till_done() + + self.assertEqual(1, len(calls))