Cleanup use of MQTT in emulated_hue tests (#4068)

* Use unix newlines on test_emulated_hue

This commit switches the test_emulated_hue module to use unix newlines
instead of the DOS style that were there before. (using dos2unix on
the file) This makes it consistent with the other files in the repo.

* Cleanup emulated_hue tests

Previously these tests relied on the mqtt light platform as test devices
to control with the emulated hue. However, this was pretty heavyweight
and required running an MQTT broker in the tests. Instead this commit
switches it to use the demo light platform which is strictly in memory.

Fixes #3549
This commit is contained in:
Matthew Treinish 2016-10-27 02:33:43 -04:00 committed by Paulus Schoutsen
parent 4fb0b27310
commit 5d3956ea98

View File

@ -1,472 +1,425 @@
"""The tests for the emulated Hue component.""" """The tests for the emulated Hue component."""
import time import time
import json import json
import threading import threading
import asyncio import asyncio
import unittest import unittest
import requests import requests
from homeassistant import bootstrap, const, core from homeassistant import bootstrap, const, core
import homeassistant.components as core_components import homeassistant.components as core_components
from homeassistant.components import emulated_hue, http, light, mqtt from homeassistant.components import emulated_hue, http, light
from homeassistant.const import STATE_ON, STATE_OFF from homeassistant.const import STATE_ON, STATE_OFF
from homeassistant.components.emulated_hue import ( from homeassistant.components.emulated_hue import (
HUE_API_STATE_ON, HUE_API_STATE_BRI) HUE_API_STATE_ON, HUE_API_STATE_BRI)
from tests.common import get_test_instance_port, get_test_home_assistant from tests.common import get_test_instance_port, get_test_home_assistant
HTTP_SERVER_PORT = get_test_instance_port() HTTP_SERVER_PORT = get_test_instance_port()
BRIDGE_SERVER_PORT = get_test_instance_port() BRIDGE_SERVER_PORT = get_test_instance_port()
MQTT_BROKER_PORT = get_test_instance_port()
BRIDGE_URL_BASE = "http://127.0.0.1:{}".format(BRIDGE_SERVER_PORT) + "{}"
BRIDGE_URL_BASE = "http://127.0.0.1:{}".format(BRIDGE_SERVER_PORT) + "{}" JSON_HEADERS = {const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON}
JSON_HEADERS = {const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON}
mqtt_broker = None def setup_hass_instance(emulated_hue_config):
"""Setup the Home Assistant instance to test."""
hass = get_test_home_assistant()
def setUpModule():
"""Setup things to be run when tests are started.""" # We need to do this to get access to homeassistant/turn_(on,off)
global mqtt_broker core_components.setup(hass, {core.DOMAIN: {}})
mqtt_broker = MQTTBroker('127.0.0.1', MQTT_BROKER_PORT) bootstrap.setup_component(
mqtt_broker.start() hass, http.DOMAIN,
{http.DOMAIN: {http.CONF_SERVER_PORT: HTTP_SERVER_PORT}})
def tearDownModule(): bootstrap.setup_component(hass, emulated_hue.DOMAIN, emulated_hue_config)
"""Stop everything that was started."""
global mqtt_broker return hass
mqtt_broker.stop()
def start_hass_instance(hass):
"""Start the Home Assistant instance to test."""
def setup_hass_instance(emulated_hue_config): hass.start()
"""Setup the Home Assistant instance to test.""" time.sleep(0.05)
hass = get_test_home_assistant()
# We need to do this to get access to homeassistant/turn_(on,off) class TestEmulatedHue(unittest.TestCase):
core_components.setup(hass, {core.DOMAIN: {}}) """Test the emulated Hue component."""
bootstrap.setup_component( hass = None
hass, http.DOMAIN,
{http.DOMAIN: {http.CONF_SERVER_PORT: HTTP_SERVER_PORT}}) @classmethod
def setUpClass(cls):
bootstrap.setup_component(hass, emulated_hue.DOMAIN, emulated_hue_config) """Setup the class."""
cls.hass = setup_hass_instance({
return hass emulated_hue.DOMAIN: {
emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT
}})
def start_hass_instance(hass):
"""Start the Home Assistant instance to test.""" start_hass_instance(cls.hass)
hass.start()
time.sleep(0.05) @classmethod
def tearDownClass(cls):
"""Stop the class."""
class TestEmulatedHue(unittest.TestCase): cls.hass.stop()
"""Test the emulated Hue component."""
def test_description_xml(self):
hass = None """Test the description."""
import xml.etree.ElementTree as ET
@classmethod
def setUpClass(cls): result = requests.get(
"""Setup the class.""" BRIDGE_URL_BASE.format('/description.xml'), timeout=5)
cls.hass = setup_hass_instance({
emulated_hue.DOMAIN: { self.assertEqual(result.status_code, 200)
emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT self.assertTrue('text/xml' in result.headers['content-type'])
}})
# Make sure the XML is parsable
start_hass_instance(cls.hass) try:
ET.fromstring(result.text)
@classmethod except:
def tearDownClass(cls): self.fail('description.xml is not valid XML!')
"""Stop the class."""
cls.hass.stop() def test_create_username(self):
"""Test the creation of an username."""
def test_description_xml(self): request_json = {'devicetype': 'my_device'}
"""Test the description."""
import xml.etree.ElementTree as ET result = requests.post(
BRIDGE_URL_BASE.format('/api'), data=json.dumps(request_json),
result = requests.get( timeout=5)
BRIDGE_URL_BASE.format('/description.xml'), timeout=5)
self.assertEqual(result.status_code, 200)
self.assertEqual(result.status_code, 200) self.assertTrue('application/json' in result.headers['content-type'])
self.assertTrue('text/xml' in result.headers['content-type'])
resp_json = result.json()
# Make sure the XML is parsable success_json = resp_json[0]
try:
ET.fromstring(result.text) self.assertTrue('success' in success_json)
except: self.assertTrue('username' in success_json['success'])
self.fail('description.xml is not valid XML!')
def test_valid_username_request(self):
def test_create_username(self): """Test request with a valid username."""
"""Test the creation of an username.""" request_json = {'invalid_key': 'my_device'}
request_json = {'devicetype': 'my_device'}
result = requests.post(
result = requests.post( BRIDGE_URL_BASE.format('/api'), data=json.dumps(request_json),
BRIDGE_URL_BASE.format('/api'), data=json.dumps(request_json), timeout=5)
timeout=5)
self.assertEqual(result.status_code, 400)
self.assertEqual(result.status_code, 200)
self.assertTrue('application/json' in result.headers['content-type'])
class TestEmulatedHueExposedByDefault(unittest.TestCase):
resp_json = result.json() """Test class for emulated hue component."""
success_json = resp_json[0]
@classmethod
self.assertTrue('success' in success_json) def setUpClass(cls):
self.assertTrue('username' in success_json['success']) """Setup the class."""
cls.hass = setup_hass_instance({
def test_valid_username_request(self): emulated_hue.DOMAIN: {
"""Test request with a valid username.""" emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT,
request_json = {'invalid_key': 'my_device'} emulated_hue.CONF_EXPOSE_BY_DEFAULT: True
}
result = requests.post( })
BRIDGE_URL_BASE.format('/api'), data=json.dumps(request_json),
timeout=5) bootstrap.setup_component(cls.hass, light.DOMAIN, {
'light': [
self.assertEqual(result.status_code, 400) {
'platform': 'demo',
}
class TestEmulatedHueExposedByDefault(unittest.TestCase): ]
"""Test class for emulated hue component.""" })
@classmethod start_hass_instance(cls.hass)
def setUpClass(cls):
"""Setup the class.""" # Kitchen light is explicitly excluded from being exposed
cls.hass = setup_hass_instance({ kitchen_light_entity = cls.hass.states.get('light.kitchen_lights')
emulated_hue.DOMAIN: { attrs = dict(kitchen_light_entity.attributes)
emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT, attrs[emulated_hue.ATTR_EMULATED_HUE] = False
emulated_hue.CONF_EXPOSE_BY_DEFAULT: True cls.hass.states.set(
} kitchen_light_entity.entity_id, kitchen_light_entity.state,
}) attributes=attrs)
bootstrap.setup_component(cls.hass, mqtt.DOMAIN, { @classmethod
'mqtt': { def tearDownClass(cls):
'broker': '127.0.0.1', """Stop the class."""
'port': MQTT_BROKER_PORT cls.hass.stop()
}
}) def test_discover_lights(self):
"""Test the discovery of lights."""
bootstrap.setup_component(cls.hass, light.DOMAIN, { result = requests.get(
'light': [ BRIDGE_URL_BASE.format('/api/username/lights'), timeout=5)
{
'platform': 'mqtt', self.assertEqual(result.status_code, 200)
'name': 'Office light', self.assertTrue('application/json' in result.headers['content-type'])
'state_topic': 'office/rgb1/light/status',
'command_topic': 'office/rgb1/light/switch', result_json = result.json()
'brightness_state_topic': 'office/rgb1/brightness/status',
'brightness_command_topic': 'office/rgb1/brightness/set', # Make sure the lights we added to the config are there
'optimistic': True self.assertTrue('light.ceiling_lights' in result_json)
}, self.assertTrue('light.bed_light' in result_json)
{ self.assertTrue('light.kitchen_lights' not in result_json)
'platform': 'mqtt',
'name': 'Bedroom light', def test_get_light_state(self):
'state_topic': 'bedroom/rgb1/light/status', """Test the getting of light state."""
'command_topic': 'bedroom/rgb1/light/switch', # Turn office light on and set to 127 brightness
'brightness_state_topic': 'bedroom/rgb1/brightness/status', self.hass.services.call(
'brightness_command_topic': 'bedroom/rgb1/brightness/set', light.DOMAIN, const.SERVICE_TURN_ON,
'optimistic': True {
}, const.ATTR_ENTITY_ID: 'light.ceiling_lights',
{ light.ATTR_BRIGHTNESS: 127
'platform': 'mqtt', },
'name': 'Kitchen light', blocking=True)
'state_topic': 'kitchen/rgb1/light/status',
'command_topic': 'kitchen/rgb1/light/switch', office_json = self.perform_get_light_state('light.ceiling_lights', 200)
'brightness_state_topic': 'kitchen/rgb1/brightness/status',
'brightness_command_topic': 'kitchen/rgb1/brightness/set', self.assertEqual(office_json['state'][HUE_API_STATE_ON], True)
'optimistic': True self.assertEqual(office_json['state'][HUE_API_STATE_BRI], 127)
}
] # Turn bedroom light off
}) self.hass.services.call(
light.DOMAIN, const.SERVICE_TURN_OFF,
start_hass_instance(cls.hass) {
const.ATTR_ENTITY_ID: 'light.bed_light'
# Kitchen light is explicitly excluded from being exposed },
kitchen_light_entity = cls.hass.states.get('light.kitchen_light') blocking=True)
attrs = dict(kitchen_light_entity.attributes)
attrs[emulated_hue.ATTR_EMULATED_HUE] = False bedroom_json = self.perform_get_light_state('light.bed_light', 200)
cls.hass.states.set(
kitchen_light_entity.entity_id, kitchen_light_entity.state, self.assertEqual(bedroom_json['state'][HUE_API_STATE_ON], False)
attributes=attrs) self.assertEqual(bedroom_json['state'][HUE_API_STATE_BRI], 0)
@classmethod # Make sure kitchen light isn't accessible
def tearDownClass(cls): kitchen_url = '/api/username/lights/{}'.format('light.kitchen_lights')
"""Stop the class.""" kitchen_result = requests.get(
cls.hass.stop() BRIDGE_URL_BASE.format(kitchen_url), timeout=5)
def test_discover_lights(self): self.assertEqual(kitchen_result.status_code, 404)
"""Test the discovery of lights."""
result = requests.get( def test_put_light_state(self):
BRIDGE_URL_BASE.format('/api/username/lights'), timeout=5) """Test the seeting of light states."""
self.perform_put_test_on_ceiling_lights()
self.assertEqual(result.status_code, 200)
self.assertTrue('application/json' in result.headers['content-type']) # Turn the bedroom light on first
self.hass.services.call(
result_json = result.json() light.DOMAIN, const.SERVICE_TURN_ON,
{const.ATTR_ENTITY_ID: 'light.bed_light',
# Make sure the lights we added to the config are there light.ATTR_BRIGHTNESS: 153},
self.assertTrue('light.office_light' in result_json) blocking=True)
self.assertTrue('light.bedroom_light' in result_json)
self.assertTrue('light.kitchen_light' not in result_json) bed_light = self.hass.states.get('light.bed_light')
self.assertEqual(bed_light.state, STATE_ON)
def test_get_light_state(self): self.assertEqual(bed_light.attributes[light.ATTR_BRIGHTNESS], 153)
"""Test the getting of light state."""
# Turn office light on and set to 127 brightness # Go through the API to turn it off
self.hass.services.call( bedroom_result = self.perform_put_light_state(
light.DOMAIN, const.SERVICE_TURN_ON, 'light.bed_light', False)
{
const.ATTR_ENTITY_ID: 'light.office_light', bedroom_result_json = bedroom_result.json()
light.ATTR_BRIGHTNESS: 127
}, self.assertEqual(bedroom_result.status_code, 200)
blocking=True) self.assertTrue(
'application/json' in bedroom_result.headers['content-type'])
office_json = self.perform_get_light_state('light.office_light', 200)
self.assertEqual(len(bedroom_result_json), 1)
self.assertEqual(office_json['state'][HUE_API_STATE_ON], True)
self.assertEqual(office_json['state'][HUE_API_STATE_BRI], 127) # Check to make sure the state changed
bed_light = self.hass.states.get('light.bed_light')
# Turn bedroom light off self.assertEqual(bed_light.state, STATE_OFF)
self.hass.services.call(
light.DOMAIN, const.SERVICE_TURN_OFF, # Make sure we can't change the kitchen light state
{ kitchen_result = self.perform_put_light_state(
const.ATTR_ENTITY_ID: 'light.bedroom_light' 'light.kitchen_light', True)
}, self.assertEqual(kitchen_result.status_code, 404)
blocking=True)
def test_put_with_form_urlencoded_content_type(self):
bedroom_json = self.perform_get_light_state('light.bedroom_light', 200) """Test the form with urlencoded content."""
# Needed for Alexa
self.assertEqual(bedroom_json['state'][HUE_API_STATE_ON], False) self.perform_put_test_on_ceiling_lights(
self.assertEqual(bedroom_json['state'][HUE_API_STATE_BRI], 0) 'application/x-www-form-urlencoded')
# Make sure kitchen light isn't accessible # Make sure we fail gracefully when we can't parse the data
kitchen_url = '/api/username/lights/{}'.format('light.kitchen_light') data = {'key1': 'value1', 'key2': 'value2'}
kitchen_result = requests.get( result = requests.put(
BRIDGE_URL_BASE.format(kitchen_url), timeout=5) BRIDGE_URL_BASE.format(
'/api/username/lights/{}/state'.format(
self.assertEqual(kitchen_result.status_code, 404) "light.ceiling_lights")), data=data)
def test_put_light_state(self): self.assertEqual(result.status_code, 400)
"""Test the seeting of light states."""
self.perform_put_test_on_office_light() def test_entity_not_found(self):
"""Test for entity which are not found."""
# Turn the bedroom light on first result = requests.get(
self.hass.services.call( BRIDGE_URL_BASE.format(
light.DOMAIN, const.SERVICE_TURN_ON, '/api/username/lights/{}'.format("not.existant_entity")),
{const.ATTR_ENTITY_ID: 'light.bedroom_light', timeout=5)
light.ATTR_BRIGHTNESS: 153},
blocking=True) self.assertEqual(result.status_code, 404)
bedroom_light = self.hass.states.get('light.bedroom_light') result = requests.put(
self.assertEqual(bedroom_light.state, STATE_ON) BRIDGE_URL_BASE.format(
self.assertEqual(bedroom_light.attributes[light.ATTR_BRIGHTNESS], 153) '/api/username/lights/{}/state'.format("non.existant_entity")),
timeout=5)
# Go through the API to turn it off
bedroom_result = self.perform_put_light_state( self.assertEqual(result.status_code, 404)
'light.bedroom_light', False)
def test_allowed_methods(self):
bedroom_result_json = bedroom_result.json() """Test the allowed methods."""
result = requests.get(
self.assertEqual(bedroom_result.status_code, 200) BRIDGE_URL_BASE.format(
self.assertTrue( '/api/username/lights/{}/state'.format(
'application/json' in bedroom_result.headers['content-type']) "light.ceiling_lights")))
self.assertEqual(len(bedroom_result_json), 1) self.assertEqual(result.status_code, 405)
# Check to make sure the state changed result = requests.put(
bedroom_light = self.hass.states.get('light.bedroom_light') BRIDGE_URL_BASE.format(
self.assertEqual(bedroom_light.state, STATE_OFF) '/api/username/lights/{}'.format("light.ceiling_lights")),
data={'key1': 'value1'})
# Make sure we can't change the kitchen light state
kitchen_result = self.perform_put_light_state( self.assertEqual(result.status_code, 405)
'light.kitchen_light', True)
self.assertEqual(kitchen_result.status_code, 404) result = requests.put(
BRIDGE_URL_BASE.format('/api/username/lights'),
def test_put_with_form_urlencoded_content_type(self): data={'key1': 'value1'})
"""Test the form with urlencoded content."""
# Needed for Alexa self.assertEqual(result.status_code, 405)
self.perform_put_test_on_office_light(
'application/x-www-form-urlencoded') def test_proper_put_state_request(self):
"""Test the request to set the state."""
# Make sure we fail gracefully when we can't parse the data # Test proper on value parsing
data = {'key1': 'value1', 'key2': 'value2'} result = requests.put(
result = requests.put( BRIDGE_URL_BASE.format(
BRIDGE_URL_BASE.format( '/api/username/lights/{}/state'.format(
'/api/username/lights/{}/state'.format("light.office_light")), "light.ceiling_lights")),
data=data) data=json.dumps({HUE_API_STATE_ON: 1234}))
self.assertEqual(result.status_code, 400) self.assertEqual(result.status_code, 400)
def test_entity_not_found(self): # Test proper brightness value parsing
"""Test for entity which are not found.""" result = requests.put(
result = requests.get( BRIDGE_URL_BASE.format(
BRIDGE_URL_BASE.format( '/api/username/lights/{}/state'.format(
'/api/username/lights/{}'.format("not.existant_entity")), "light.ceiling_lights")), data=json.dumps({
timeout=5) HUE_API_STATE_ON: True,
HUE_API_STATE_BRI: 'Hello world!'
self.assertEqual(result.status_code, 404) }))
result = requests.put( self.assertEqual(result.status_code, 400)
BRIDGE_URL_BASE.format(
'/api/username/lights/{}/state'.format("non.existant_entity")), def perform_put_test_on_ceiling_lights(self,
timeout=5) content_type='application/json'):
"""Test the setting of a light."""
self.assertEqual(result.status_code, 404) # Turn the office light off first
self.hass.services.call(
def test_allowed_methods(self): light.DOMAIN, const.SERVICE_TURN_OFF,
"""Test the allowed methods.""" {const.ATTR_ENTITY_ID: 'light.ceiling_lights'},
result = requests.get( blocking=True)
BRIDGE_URL_BASE.format(
'/api/username/lights/{}/state'.format("light.office_light"))) ceiling_lights = self.hass.states.get('light.ceiling_lights')
self.assertEqual(ceiling_lights.state, STATE_OFF)
self.assertEqual(result.status_code, 405)
# Go through the API to turn it on
result = requests.put( office_result = self.perform_put_light_state(
BRIDGE_URL_BASE.format( 'light.ceiling_lights', True, 56, content_type)
'/api/username/lights/{}'.format("light.office_light")),
data={'key1': 'value1'}) office_result_json = office_result.json()
self.assertEqual(result.status_code, 405) self.assertEqual(office_result.status_code, 200)
self.assertTrue(
result = requests.put( 'application/json' in office_result.headers['content-type'])
BRIDGE_URL_BASE.format('/api/username/lights'),
data={'key1': 'value1'}) self.assertEqual(len(office_result_json), 2)
self.assertEqual(result.status_code, 405) # Check to make sure the state changed
ceiling_lights = self.hass.states.get('light.ceiling_lights')
def test_proper_put_state_request(self): self.assertEqual(ceiling_lights.state, STATE_ON)
"""Test the request to set the state.""" self.assertEqual(ceiling_lights.attributes[light.ATTR_BRIGHTNESS], 56)
# Test proper on value parsing
result = requests.put( def perform_get_light_state(self, entity_id, expected_status):
BRIDGE_URL_BASE.format( """Test the gettting of a light state."""
'/api/username/lights/{}/state'.format("light.office_light")), result = requests.get(
data=json.dumps({HUE_API_STATE_ON: 1234})) BRIDGE_URL_BASE.format(
'/api/username/lights/{}'.format(entity_id)), timeout=5)
self.assertEqual(result.status_code, 400)
self.assertEqual(result.status_code, expected_status)
# Test proper brightness value parsing
result = requests.put( if expected_status == 200:
BRIDGE_URL_BASE.format( self.assertTrue(
'/api/username/lights/{}/state'.format("light.office_light")), 'application/json' in result.headers['content-type'])
data=json.dumps({
HUE_API_STATE_ON: True, return result.json()
HUE_API_STATE_BRI: 'Hello world!'
})) return None
self.assertEqual(result.status_code, 400) def perform_put_light_state(self, entity_id, is_on, brightness=None,
content_type='application/json'):
def perform_put_test_on_office_light(self, """Test the setting of a light state."""
content_type='application/json'): url = BRIDGE_URL_BASE.format(
"""Test the setting of a light.""" '/api/username/lights/{}/state'.format(entity_id))
# Turn the office light off first
self.hass.services.call( req_headers = {'Content-Type': content_type}
light.DOMAIN, const.SERVICE_TURN_OFF,
{const.ATTR_ENTITY_ID: 'light.office_light'}, data = {HUE_API_STATE_ON: is_on}
blocking=True)
if brightness is not None:
office_light = self.hass.states.get('light.office_light') data[HUE_API_STATE_BRI] = brightness
self.assertEqual(office_light.state, STATE_OFF)
result = requests.put(
# Go through the API to turn it on url, data=json.dumps(data), timeout=5, headers=req_headers)
office_result = self.perform_put_light_state( return result
'light.office_light', True, 56, content_type)
office_result_json = office_result.json() class MQTTBroker(object):
"""Encapsulates an embedded MQTT broker."""
self.assertEqual(office_result.status_code, 200)
self.assertTrue( def __init__(self, host, port):
'application/json' in office_result.headers['content-type']) """Initialize a new instance."""
from hbmqtt.broker import Broker
self.assertEqual(len(office_result_json), 2)
self._loop = asyncio.new_event_loop()
# Check to make sure the state changed
office_light = self.hass.states.get('light.office_light') hbmqtt_config = {
self.assertEqual(office_light.state, STATE_ON) 'listeners': {
self.assertEqual(office_light.attributes[light.ATTR_BRIGHTNESS], 56) 'default': {
'max-connections': 50000,
def perform_get_light_state(self, entity_id, expected_status): 'type': 'tcp',
"""Test the gettting of a light state.""" 'bind': '{}:{}'.format(host, port)
result = requests.get( }
BRIDGE_URL_BASE.format( },
'/api/username/lights/{}'.format(entity_id)), timeout=5) 'auth': {
'plugins': ['auth.anonymous'],
self.assertEqual(result.status_code, expected_status) 'allow-anonymous': True
}
if expected_status == 200: }
self.assertTrue(
'application/json' in result.headers['content-type']) self._broker = Broker(config=hbmqtt_config, loop=self._loop)
return result.json() self._thread = threading.Thread(target=self._run_loop)
self._started_ev = threading.Event()
return None
def start(self):
def perform_put_light_state(self, entity_id, is_on, brightness=None, """Start the broker."""
content_type='application/json'): self._thread.start()
"""Test the setting of a light state.""" self._started_ev.wait()
url = BRIDGE_URL_BASE.format(
'/api/username/lights/{}/state'.format(entity_id)) def stop(self):
"""Stop the broker."""
req_headers = {'Content-Type': content_type} self._loop.call_soon_threadsafe(asyncio.async, self._broker.shutdown())
self._loop.call_soon_threadsafe(self._loop.stop)
data = {HUE_API_STATE_ON: is_on} self._thread.join()
if brightness is not None: def _run_loop(self):
data[HUE_API_STATE_BRI] = brightness """Run the loop."""
asyncio.set_event_loop(self._loop)
result = requests.put( self._loop.run_until_complete(self._broker_coroutine())
url, data=json.dumps(data), timeout=5, headers=req_headers)
return result self._started_ev.set()
self._loop.run_forever()
class MQTTBroker(object): self._loop.close()
"""Encapsulates an embedded MQTT broker."""
@asyncio.coroutine
def __init__(self, host, port): def _broker_coroutine(self):
"""Initialize a new instance.""" """The Broker coroutine."""
from hbmqtt.broker import Broker yield from self._broker.start()
self._loop = asyncio.new_event_loop()
hbmqtt_config = {
'listeners': {
'default': {
'max-connections': 50000,
'type': 'tcp',
'bind': '{}:{}'.format(host, port)
}
},
'auth': {
'plugins': ['auth.anonymous'],
'allow-anonymous': True
}
}
self._broker = Broker(config=hbmqtt_config, loop=self._loop)
self._thread = threading.Thread(target=self._run_loop)
self._started_ev = threading.Event()
def start(self):
"""Start the broker."""
self._thread.start()
self._started_ev.wait()
def stop(self):
"""Stop the broker."""
self._loop.call_soon_threadsafe(asyncio.async, self._broker.shutdown())
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()
def _run_loop(self):
"""Run the loop."""
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._broker_coroutine())
self._started_ev.set()
self._loop.run_forever()
self._loop.close()
@asyncio.coroutine
def _broker_coroutine(self):
"""The Broker coroutine."""
yield from self._broker.start()