mirror of
https://github.com/home-assistant/core.git
synced 2025-04-24 01:08:12 +00:00
Completely local control of entities via Alexa (#2942)
* Initial code for alexa_local_control. * Added support for creating a dummy username. * Move SSDP responses to local variables. * Added config validation via Voluptuous. * Modify and remove unnecessary returned emulated bridge values. * Remove script and scene domains from default exposed domains. * Replaced Flask with HomeAssistantWSGI. * Fix lint errors. * Correcting grammar and spelling in docs and comments. * Rename alexa_local_control to emulated_hue. * Rename emulated_hue attributes. * Fix a bug where something marked not exposed is exposed by default. * Make sure the UPNP responder thread cleanly stops when HASS stops. Also fix some config loading and lint errors. * Fixed unexposed entities still having individual state exposed. * Started writing tests for emulated_hue. * Fix being able to set state of non-exposed entity. * Another test for emulated_hue. * More tests for emulated_hue. Also slightly simplified emulated_hue's PUT handler. * Fix bad test, sorry :/ * Third time's the charm. * Fix lint and value validation tests. * Rename emulated_hue bridge name. * Remove license and documentation from header. * Combine two if statements. * Style changes. * Fixed various issues and added some constants
This commit is contained in:
parent
a4b8c3cab0
commit
c05d27d214
542
homeassistant/components/emulated_hue.py
Executable file
542
homeassistant/components/emulated_hue.py
Executable file
@ -0,0 +1,542 @@
|
||||
"""
|
||||
Support for local control of entities by emulating the Phillips Hue bridge.
|
||||
|
||||
For more details about this component, please refer to the documentation at
|
||||
https://home-assistant.io/components/emulated_hue/
|
||||
"""
|
||||
import threading
|
||||
import socket
|
||||
import logging
|
||||
import json
|
||||
import os
|
||||
import select
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import util, core
|
||||
from homeassistant.const import (
|
||||
ATTR_ENTITY_ID, ATTR_FRIENDLY_NAME, SERVICE_TURN_OFF, SERVICE_TURN_ON,
|
||||
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP,
|
||||
STATE_ON
|
||||
)
|
||||
from homeassistant.components.light import (
|
||||
ATTR_BRIGHTNESS, ATTR_SUPPORTED_FEATURES, SUPPORT_BRIGHTNESS
|
||||
)
|
||||
from homeassistant.components.http import (
|
||||
HomeAssistantView, HomeAssistantWSGI
|
||||
)
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
|
||||
DOMAIN = 'emulated_hue'
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
CONF_HOST_IP = 'host_ip'
|
||||
CONF_LISTEN_PORT = 'listen_port'
|
||||
CONF_OFF_MAPS_TO_ON_DOMAINS = 'off_maps_to_on_domains'
|
||||
CONF_EXPOSE_BY_DEFAULT = 'expose_by_default'
|
||||
CONF_EXPOSED_DOMAINS = 'exposed_domains'
|
||||
|
||||
ATTR_EMULATED_HUE = 'emulated_hue'
|
||||
ATTR_EMULATED_HUE_NAME = 'emulated_hue_name'
|
||||
|
||||
DEFAULT_LISTEN_PORT = 8300
|
||||
DEFAULT_OFF_MAPS_TO_ON_DOMAINS = ['script', 'scene']
|
||||
DEFAULT_EXPOSE_BY_DEFAULT = True
|
||||
DEFAULT_EXPOSED_DOMAINS = [
|
||||
'switch', 'light', 'group', 'input_boolean', 'media_player'
|
||||
]
|
||||
|
||||
HUE_API_STATE_ON = 'on'
|
||||
HUE_API_STATE_BRI = 'bri'
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema({
|
||||
DOMAIN: vol.Schema({
|
||||
vol.Optional(CONF_HOST_IP): cv.string,
|
||||
vol.Optional(CONF_LISTEN_PORT, default=DEFAULT_LISTEN_PORT):
|
||||
vol.All(vol.Coerce(int), vol.Range(min=1, max=65535)),
|
||||
vol.Optional(CONF_OFF_MAPS_TO_ON_DOMAINS): cv.ensure_list,
|
||||
vol.Optional(CONF_EXPOSE_BY_DEFAULT): cv.boolean,
|
||||
vol.Optional(CONF_EXPOSED_DOMAINS): cv.ensure_list
|
||||
})
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
|
||||
def setup(hass, yaml_config):
|
||||
"""Activate the emulated_hue component."""
|
||||
config = Config(yaml_config)
|
||||
|
||||
server = HomeAssistantWSGI(
|
||||
hass,
|
||||
development=False,
|
||||
server_host=config.host_ip_addr,
|
||||
server_port=config.listen_port,
|
||||
api_password=None,
|
||||
ssl_certificate=None,
|
||||
ssl_key=None,
|
||||
cors_origins=[]
|
||||
)
|
||||
|
||||
server.register_view(DescriptionXmlView(hass, config))
|
||||
server.register_view(HueUsernameView(hass))
|
||||
server.register_view(HueLightsView(hass, config))
|
||||
|
||||
upnp_listener = UPNPResponderThread(
|
||||
config.host_ip_addr, config.listen_port)
|
||||
|
||||
def start_emulated_hue_bridge(event):
|
||||
"""Start the emulated hue bridge."""
|
||||
server.start()
|
||||
upnp_listener.start()
|
||||
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_emulated_hue_bridge)
|
||||
|
||||
def stop_emulated_hue_bridge(event):
|
||||
"""Stop the emulated hue bridge."""
|
||||
upnp_listener.stop()
|
||||
server.stop()
|
||||
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_emulated_hue_bridge)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
class Config(object):
|
||||
"""Holds configuration variables for the emulated hue bridge."""
|
||||
|
||||
def __init__(self, yaml_config):
|
||||
"""Initialize the instance."""
|
||||
conf = yaml_config.get(DOMAIN, {})
|
||||
|
||||
# Get the IP address that will be passed to the Echo during discovery
|
||||
self.host_ip_addr = conf.get(CONF_HOST_IP)
|
||||
if self.host_ip_addr is None:
|
||||
self.host_ip_addr = util.get_local_ip()
|
||||
_LOGGER.warning(
|
||||
"Listen IP address not specified, auto-detected address is %s",
|
||||
self.host_ip_addr)
|
||||
|
||||
# Get the port that the Hue bridge will listen on
|
||||
self.listen_port = conf.get(CONF_LISTEN_PORT)
|
||||
if not isinstance(self.listen_port, int):
|
||||
self.listen_port = DEFAULT_LISTEN_PORT
|
||||
_LOGGER.warning(
|
||||
"Listen port not specified, defaulting to %s",
|
||||
self.listen_port)
|
||||
|
||||
# Get domains that cause both "on" and "off" commands to map to "on"
|
||||
# This is primarily useful for things like scenes or scripts, which
|
||||
# don't really have a concept of being off
|
||||
self.off_maps_to_on_domains = conf.get(CONF_OFF_MAPS_TO_ON_DOMAINS)
|
||||
if not isinstance(self.off_maps_to_on_domains, list):
|
||||
self.off_maps_to_on_domains = DEFAULT_OFF_MAPS_TO_ON_DOMAINS
|
||||
|
||||
# Get whether or not entities should be exposed by default, or if only
|
||||
# explicitly marked ones will be exposed
|
||||
self.expose_by_default = conf.get(
|
||||
CONF_EXPOSE_BY_DEFAULT, DEFAULT_EXPOSE_BY_DEFAULT)
|
||||
|
||||
# Get domains that are exposed by default when expose_by_default is
|
||||
# True
|
||||
self.exposed_domains = conf.get(
|
||||
CONF_EXPOSED_DOMAINS, DEFAULT_EXPOSED_DOMAINS)
|
||||
|
||||
|
||||
class DescriptionXmlView(HomeAssistantView):
|
||||
"""Handles requests for the description.xml file."""
|
||||
|
||||
url = '/description.xml'
|
||||
name = 'description:xml'
|
||||
requires_auth = False
|
||||
|
||||
def __init__(self, hass, config):
|
||||
"""Initialize the instance of the view."""
|
||||
super().__init__(hass)
|
||||
self.config = config
|
||||
|
||||
def get(self, request):
|
||||
"""Handle a GET request."""
|
||||
xml_template = """<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<root xmlns="urn:schemas-upnp-org:device-1-0">
|
||||
<specVersion>
|
||||
<major>1</major>
|
||||
<minor>0</minor>
|
||||
</specVersion>
|
||||
<URLBase>http://{0}:{1}/</URLBase>
|
||||
<device>
|
||||
<deviceType>urn:schemas-upnp-org:device:Basic:1</deviceType>
|
||||
<friendlyName>HASS Bridge ({0})</friendlyName>
|
||||
<manufacturer>Royal Philips Electronics</manufacturer>
|
||||
<manufacturerURL>http://www.philips.com</manufacturerURL>
|
||||
<modelDescription>Philips hue Personal Wireless Lighting</modelDescription>
|
||||
<modelName>Philips hue bridge 2015</modelName>
|
||||
<modelNumber>BSB002</modelNumber>
|
||||
<modelURL>http://www.meethue.com</modelURL>
|
||||
<serialNumber>1234</serialNumber>
|
||||
<UDN>uuid:2f402f80-da50-11e1-9b23-001788255acc</UDN>
|
||||
</device>
|
||||
</root>
|
||||
"""
|
||||
|
||||
resp_text = xml_template.format(
|
||||
self.config.host_ip_addr, self.config.listen_port)
|
||||
|
||||
return self.Response(resp_text, mimetype='text/xml')
|
||||
|
||||
|
||||
class HueUsernameView(HomeAssistantView):
|
||||
"""Handle requests to create a username for the emulated hue bridge."""
|
||||
|
||||
url = '/api'
|
||||
name = 'hue:api'
|
||||
requires_auth = False
|
||||
|
||||
def __init__(self, hass):
|
||||
"""Initialize the instance of the view."""
|
||||
super().__init__(hass)
|
||||
|
||||
def post(self, request):
|
||||
"""Handle a POST request."""
|
||||
data = request.json
|
||||
|
||||
if 'devicetype' not in data:
|
||||
return self.Response("devicetype not specified", status=400)
|
||||
|
||||
json_response = [{'success': {'username': '12345678901234567890'}}]
|
||||
|
||||
return self.json(json_response)
|
||||
|
||||
|
||||
class HueLightsView(HomeAssistantView):
|
||||
"""Handle requests for getting and setting info about entities."""
|
||||
|
||||
url = '/api/<username>/lights'
|
||||
name = 'api:username:lights'
|
||||
extra_urls = ['/api/<username>/lights/<entity_id>',
|
||||
'/api/<username>/lights/<entity_id>/state']
|
||||
requires_auth = False
|
||||
|
||||
def __init__(self, hass, config):
|
||||
"""Initialize the instance of the view."""
|
||||
super().__init__(hass)
|
||||
self.config = config
|
||||
self.cached_states = {}
|
||||
|
||||
def get(self, request, username, entity_id=None):
|
||||
"""Handle a GET request."""
|
||||
if entity_id is None:
|
||||
return self.get_lights_list()
|
||||
|
||||
if not request.base_url.endswith('state'):
|
||||
return self.get_light_state(entity_id)
|
||||
|
||||
return self.Response("Method not allowed", status=405)
|
||||
|
||||
def put(self, request, username, entity_id=None):
|
||||
"""Handle a PUT request."""
|
||||
if not request.base_url.endswith('state'):
|
||||
return self.Response("Method not allowed", status=405)
|
||||
|
||||
content_type = request.environ.get('CONTENT_TYPE', '')
|
||||
if content_type == 'application/x-www-form-urlencoded':
|
||||
# Alexa sends JSON data with a form data content type, for
|
||||
# whatever reason, and Werkzeug parses form data automatically,
|
||||
# so we need to do some gymnastics to get the data we need
|
||||
json_data = None
|
||||
|
||||
for key in request.form:
|
||||
try:
|
||||
json_data = json.loads(key)
|
||||
break
|
||||
except ValueError:
|
||||
# Try the next key?
|
||||
pass
|
||||
|
||||
if json_data is None:
|
||||
return self.Response("Bad request", status=400)
|
||||
else:
|
||||
json_data = request.json
|
||||
|
||||
return self.put_light_state(json_data, entity_id)
|
||||
|
||||
def get_lights_list(self):
|
||||
"""Process a request to get the list of available lights."""
|
||||
json_response = {}
|
||||
|
||||
for entity in self.hass.states.all():
|
||||
if self.is_entity_exposed(entity):
|
||||
json_response[entity.entity_id] = entity_to_json(entity)
|
||||
|
||||
return self.json(json_response)
|
||||
|
||||
def get_light_state(self, entity_id):
|
||||
"""Process a request to get the state of an individual light."""
|
||||
entity = self.hass.states.get(entity_id)
|
||||
if entity is None or not self.is_entity_exposed(entity):
|
||||
return self.Response("Entity not found", status=404)
|
||||
|
||||
cached_state = self.cached_states.get(entity_id, None)
|
||||
|
||||
if cached_state is None:
|
||||
final_state = entity.state == STATE_ON
|
||||
final_brightness = entity.attributes.get(
|
||||
ATTR_BRIGHTNESS, 255 if final_state else 0)
|
||||
else:
|
||||
final_state, final_brightness = cached_state
|
||||
|
||||
json_response = entity_to_json(entity, final_state, final_brightness)
|
||||
|
||||
return self.json(json_response)
|
||||
|
||||
def put_light_state(self, request_json, entity_id):
|
||||
"""Process a request to set the state of an individual light."""
|
||||
config = self.config
|
||||
|
||||
# Retrieve the entity from the state machine
|
||||
entity = self.hass.states.get(entity_id)
|
||||
if entity is None:
|
||||
return self.Response("Entity not found", status=404)
|
||||
|
||||
if not self.is_entity_exposed(entity):
|
||||
return self.Response("Entity not found", status=404)
|
||||
|
||||
# Parse the request into requested "on" status and brightness
|
||||
parsed = parse_hue_api_put_light_body(request_json, entity)
|
||||
|
||||
if parsed is None:
|
||||
return self.Response("Bad request", status=400)
|
||||
|
||||
result, brightness = parsed
|
||||
|
||||
# Convert the resulting "on" status into the service we need to call
|
||||
service = SERVICE_TURN_ON if result else SERVICE_TURN_OFF
|
||||
|
||||
# Construct what we need to send to the service
|
||||
data = {ATTR_ENTITY_ID: entity_id}
|
||||
|
||||
if brightness is not None:
|
||||
data[ATTR_BRIGHTNESS] = brightness
|
||||
|
||||
if entity.domain.lower() in config.off_maps_to_on_domains:
|
||||
# Map the off command to on
|
||||
service = SERVICE_TURN_ON
|
||||
|
||||
# Caching is required because things like scripts and scenes won't
|
||||
# report as "off" to Alexa if an "off" command is received, because
|
||||
# they'll map to "on". Thus, instead of reporting its actual
|
||||
# status, we report what Alexa will want to see, which is the same
|
||||
# as the actual requested command.
|
||||
self.cached_states[entity_id] = (result, brightness)
|
||||
|
||||
# Perform the requested action
|
||||
self.hass.services.call(core.DOMAIN, service, data, blocking=True)
|
||||
|
||||
json_response = \
|
||||
[create_hue_success_response(entity_id, HUE_API_STATE_ON, result)]
|
||||
|
||||
if brightness is not None:
|
||||
json_response.append(create_hue_success_response(
|
||||
entity_id, HUE_API_STATE_BRI, brightness))
|
||||
|
||||
return self.json(json_response)
|
||||
|
||||
def is_entity_exposed(self, entity):
|
||||
"""Determine if an entity should be exposed on the emulated bridge."""
|
||||
config = self.config
|
||||
|
||||
if entity.attributes.get('view') is not None:
|
||||
# Ignore entities that are views
|
||||
return False
|
||||
|
||||
domain = entity.domain.lower()
|
||||
explicit_expose = entity.attributes.get(ATTR_EMULATED_HUE, None)
|
||||
|
||||
domain_exposed_by_default = \
|
||||
config.expose_by_default and domain in config.exposed_domains
|
||||
|
||||
# Expose an entity if the entity's domain is exposed by default and
|
||||
# the configuration doesn't explicitly exclude it from being
|
||||
# exposed, or if the entity is explicitly exposed
|
||||
is_default_exposed = \
|
||||
domain_exposed_by_default and explicit_expose is not False
|
||||
|
||||
return is_default_exposed or explicit_expose
|
||||
|
||||
|
||||
def parse_hue_api_put_light_body(request_json, entity):
|
||||
"""Parse the body of a request to change the state of a light."""
|
||||
if HUE_API_STATE_ON in request_json:
|
||||
if not isinstance(request_json[HUE_API_STATE_ON], bool):
|
||||
return None
|
||||
|
||||
if request_json['on']:
|
||||
# Echo requested device be turned on
|
||||
brightness = None
|
||||
report_brightness = False
|
||||
result = True
|
||||
else:
|
||||
# Echo requested device be turned off
|
||||
brightness = None
|
||||
report_brightness = False
|
||||
result = False
|
||||
|
||||
if HUE_API_STATE_BRI in request_json:
|
||||
# Make sure the entity actually supports brightness
|
||||
entity_features = entity.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
|
||||
|
||||
if (entity_features & SUPPORT_BRIGHTNESS) == SUPPORT_BRIGHTNESS:
|
||||
try:
|
||||
# Clamp brightness from 0 to 255
|
||||
brightness = \
|
||||
max(0, min(int(request_json[HUE_API_STATE_BRI]), 255))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
report_brightness = True
|
||||
result = (brightness > 0)
|
||||
|
||||
return (result, brightness) if report_brightness else (result, None)
|
||||
|
||||
|
||||
def entity_to_json(entity, is_on=None, brightness=None):
|
||||
"""Convert an entity to its Hue bridge JSON representation."""
|
||||
if is_on is None:
|
||||
is_on = entity.state == STATE_ON
|
||||
|
||||
if brightness is None:
|
||||
brightness = 255 if is_on else 0
|
||||
|
||||
name = entity.attributes.get(
|
||||
ATTR_EMULATED_HUE_NAME, entity.attributes[ATTR_FRIENDLY_NAME])
|
||||
|
||||
return {
|
||||
'state':
|
||||
{
|
||||
HUE_API_STATE_ON: is_on,
|
||||
HUE_API_STATE_BRI: brightness,
|
||||
'reachable': True
|
||||
},
|
||||
'type': 'Dimmable light',
|
||||
'name': name,
|
||||
'modelid': 'HASS123',
|
||||
'uniqueid': entity.entity_id,
|
||||
'swversion': '123'
|
||||
}
|
||||
|
||||
|
||||
def create_hue_success_response(entity_id, attr, value):
|
||||
"""Create a success response for an attribute set on a light."""
|
||||
success_key = '/lights/{}/state/{}'.format(entity_id, attr)
|
||||
return {'success': {success_key: value}}
|
||||
|
||||
|
||||
class UPNPResponderThread(threading.Thread):
|
||||
"""Handle responding to UPNP/SSDP discovery requests."""
|
||||
|
||||
_interrupted = False
|
||||
|
||||
def __init__(self, host_ip_addr, listen_port):
|
||||
"""Initialize the class."""
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
self.host_ip_addr = host_ip_addr
|
||||
self.listen_port = listen_port
|
||||
|
||||
# Note that the double newline at the end of
|
||||
# this string is required per the SSDP spec
|
||||
resp_template = """HTTP/1.1 200 OK
|
||||
CACHE-CONTROL: max-age=60
|
||||
EXT:
|
||||
LOCATION: http://{0}:{1}/description.xml
|
||||
SERVER: FreeRTOS/6.0.5, UPnP/1.0, IpBridge/0.1
|
||||
ST: urn:schemas-upnp-org:device:basic:1
|
||||
USN: uuid:Socket-1_0-221438K0100073::urn:schemas-upnp-org:device:basic:1
|
||||
|
||||
"""
|
||||
|
||||
self.upnp_response = resp_template.format(host_ip_addr, listen_port) \
|
||||
.replace("\n", "\r\n") \
|
||||
.encode('utf-8')
|
||||
|
||||
# Set up a pipe for signaling to the receiver that it's time to
|
||||
# shutdown. Essentially, we place the SSDP socket into nonblocking
|
||||
# mode and use select() to wait for data to arrive on either the SSDP
|
||||
# socket or the pipe. If data arrives on either one, select() returns
|
||||
# and tells us which filenos have data ready to read.
|
||||
#
|
||||
# When we want to stop the responder, we write data to the pipe, which
|
||||
# causes the select() to return and indicate that said pipe has data
|
||||
# ready to be read, which indicates to us that the responder needs to
|
||||
# be shutdown.
|
||||
self._interrupted_read_pipe, self._interrupted_write_pipe = os.pipe()
|
||||
|
||||
def run(self):
|
||||
"""Run the server."""
|
||||
# Listen for UDP port 1900 packets sent to SSDP multicast address
|
||||
ssdp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
ssdp_socket.setblocking(False)
|
||||
|
||||
# Required for receiving multicast
|
||||
ssdp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
ssdp_socket.setsockopt(
|
||||
socket.SOL_IP,
|
||||
socket.IP_MULTICAST_IF,
|
||||
socket.inet_aton(self.host_ip_addr))
|
||||
|
||||
ssdp_socket.setsockopt(
|
||||
socket.SOL_IP,
|
||||
socket.IP_ADD_MEMBERSHIP,
|
||||
socket.inet_aton("239.255.255.250") +
|
||||
socket.inet_aton(self.host_ip_addr))
|
||||
|
||||
ssdp_socket.bind(("239.255.255.250", 1900))
|
||||
|
||||
while True:
|
||||
if self._interrupted:
|
||||
clean_socket_close(ssdp_socket)
|
||||
return
|
||||
|
||||
try:
|
||||
read, _, _ = select.select(
|
||||
[self._interrupted_read_pipe, ssdp_socket], [],
|
||||
[ssdp_socket])
|
||||
|
||||
if self._interrupted_read_pipe in read:
|
||||
# Implies self._interrupted is True
|
||||
clean_socket_close(ssdp_socket)
|
||||
return
|
||||
elif ssdp_socket in read:
|
||||
data, addr = ssdp_socket.recvfrom(1024)
|
||||
else:
|
||||
continue
|
||||
except socket.error as ex:
|
||||
if self._interrupted:
|
||||
clean_socket_close(ssdp_socket)
|
||||
return
|
||||
|
||||
_LOGGER.error("UPNP Responder socket exception occured: %s",
|
||||
ex.__str__)
|
||||
|
||||
if "M-SEARCH" in data.decode('utf-8'):
|
||||
# SSDP M-SEARCH method received, respond to it with our info
|
||||
resp_socket = socket.socket(
|
||||
socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
resp_socket.sendto(self.upnp_response, addr)
|
||||
resp_socket.close()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the server."""
|
||||
# Request for server
|
||||
self._interrupted = True
|
||||
os.write(self._interrupted_write_pipe, bytes([0]))
|
||||
self.join()
|
||||
|
||||
|
||||
def clean_socket_close(sock):
|
||||
"""Close a socket connection and logs its closure."""
|
||||
_LOGGER.info("UPNP responder shutting down.")
|
||||
|
||||
sock.close()
|
445
tests/components/test_emulated_hue.py
Executable file
445
tests/components/test_emulated_hue.py
Executable file
@ -0,0 +1,445 @@
|
||||
import time
|
||||
import json
|
||||
import threading
|
||||
import asyncio
|
||||
|
||||
import unittest
|
||||
import requests
|
||||
|
||||
from homeassistant import bootstrap, const, core
|
||||
import homeassistant.components as core_components
|
||||
from homeassistant.components import emulated_hue, http, light, mqtt
|
||||
from homeassistant.const import STATE_ON, STATE_OFF
|
||||
from homeassistant.components.emulated_hue import (
|
||||
HUE_API_STATE_ON, HUE_API_STATE_BRI
|
||||
)
|
||||
|
||||
from tests.common import get_test_instance_port, get_test_home_assistant
|
||||
|
||||
HTTP_SERVER_PORT = get_test_instance_port()
|
||||
BRIDGE_SERVER_PORT = get_test_instance_port()
|
||||
MQTT_BROKER_PORT = get_test_instance_port()
|
||||
|
||||
BRIDGE_URL_BASE = "http://127.0.0.1:{}".format(BRIDGE_SERVER_PORT) + "{}"
|
||||
JSON_HEADERS = {const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON}
|
||||
|
||||
mqtt_broker = None
|
||||
|
||||
|
||||
def setUpModule():
|
||||
global mqtt_broker
|
||||
|
||||
mqtt_broker = MQTTBroker('127.0.0.1', MQTT_BROKER_PORT)
|
||||
mqtt_broker.start()
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
global mqtt_broker
|
||||
|
||||
mqtt_broker.stop()
|
||||
|
||||
|
||||
def setup_hass_instance(emulated_hue_config):
|
||||
hass = get_test_home_assistant()
|
||||
|
||||
# We need to do this to get access to homeassistant/turn_(on,off)
|
||||
core_components.setup(hass, {core.DOMAIN: {}})
|
||||
|
||||
bootstrap.setup_component(
|
||||
hass, http.DOMAIN,
|
||||
{http.DOMAIN: {http.CONF_SERVER_PORT: HTTP_SERVER_PORT}})
|
||||
|
||||
bootstrap.setup_component(hass, emulated_hue.DOMAIN, emulated_hue_config)
|
||||
|
||||
return hass
|
||||
|
||||
|
||||
def start_hass_instance(hass):
|
||||
hass.start()
|
||||
time.sleep(0.05)
|
||||
|
||||
|
||||
class TestEmulatedHue(unittest.TestCase):
|
||||
hass = None
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.hass = setup_hass_instance({
|
||||
emulated_hue.DOMAIN: {
|
||||
emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT
|
||||
}})
|
||||
|
||||
start_hass_instance(cls.hass)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.hass.stop()
|
||||
|
||||
def test_description_xml(self):
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
result = requests.get(
|
||||
BRIDGE_URL_BASE.format('/description.xml'), timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertTrue('text/xml' in result.headers['content-type'])
|
||||
|
||||
# Make sure the XML is parsable
|
||||
try:
|
||||
ET.fromstring(result.text)
|
||||
except:
|
||||
self.fail('description.xml is not valid XML!')
|
||||
|
||||
def test_create_username(self):
|
||||
request_json = {'devicetype': 'my_device'}
|
||||
|
||||
result = requests.post(
|
||||
BRIDGE_URL_BASE.format('/api'), data=json.dumps(request_json),
|
||||
timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertTrue('application/json' in result.headers['content-type'])
|
||||
|
||||
resp_json = result.json()
|
||||
success_json = resp_json[0]
|
||||
|
||||
self.assertTrue('success' in success_json)
|
||||
self.assertTrue('username' in success_json['success'])
|
||||
|
||||
def test_valid_username_request(self):
|
||||
request_json = {'invalid_key': 'my_device'}
|
||||
|
||||
result = requests.post(
|
||||
BRIDGE_URL_BASE.format('/api'), data=json.dumps(request_json),
|
||||
timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
|
||||
class TestEmulatedHueExposedByDefault(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.hass = setup_hass_instance({
|
||||
emulated_hue.DOMAIN: {
|
||||
emulated_hue.CONF_LISTEN_PORT: BRIDGE_SERVER_PORT,
|
||||
emulated_hue.CONF_EXPOSE_BY_DEFAULT: True
|
||||
}
|
||||
})
|
||||
|
||||
bootstrap.setup_component(cls.hass, mqtt.DOMAIN, {
|
||||
'mqtt': {
|
||||
'broker': '127.0.0.1',
|
||||
'port': MQTT_BROKER_PORT
|
||||
}
|
||||
})
|
||||
|
||||
bootstrap.setup_component(cls.hass, light.DOMAIN, {
|
||||
'light': [
|
||||
{
|
||||
'platform': 'mqtt',
|
||||
'name': 'Office light',
|
||||
'state_topic': 'office/rgb1/light/status',
|
||||
'command_topic': 'office/rgb1/light/switch',
|
||||
'brightness_state_topic': 'office/rgb1/brightness/status',
|
||||
'brightness_command_topic': 'office/rgb1/brightness/set',
|
||||
'optimistic': True
|
||||
},
|
||||
{
|
||||
'platform': 'mqtt',
|
||||
'name': 'Bedroom light',
|
||||
'state_topic': 'bedroom/rgb1/light/status',
|
||||
'command_topic': 'bedroom/rgb1/light/switch',
|
||||
'brightness_state_topic': 'bedroom/rgb1/brightness/status',
|
||||
'brightness_command_topic': 'bedroom/rgb1/brightness/set',
|
||||
'optimistic': True
|
||||
},
|
||||
{
|
||||
'platform': 'mqtt',
|
||||
'name': 'Kitchen light',
|
||||
'state_topic': 'kitchen/rgb1/light/status',
|
||||
'command_topic': 'kitchen/rgb1/light/switch',
|
||||
'brightness_state_topic': 'kitchen/rgb1/brightness/status',
|
||||
'brightness_command_topic': 'kitchen/rgb1/brightness/set',
|
||||
'optimistic': True
|
||||
}
|
||||
]
|
||||
})
|
||||
|
||||
start_hass_instance(cls.hass)
|
||||
|
||||
# Kitchen light is explicitly excluded from being exposed
|
||||
kitchen_light_entity = cls.hass.states.get('light.kitchen_light')
|
||||
attrs = dict(kitchen_light_entity.attributes)
|
||||
attrs[emulated_hue.ATTR_EMULATED_HUE] = False
|
||||
cls.hass.states.set(
|
||||
kitchen_light_entity.entity_id, kitchen_light_entity.state,
|
||||
attributes=attrs)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.hass.stop()
|
||||
|
||||
def test_discover_lights(self):
|
||||
result = requests.get(
|
||||
BRIDGE_URL_BASE.format('/api/username/lights'), timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, 200)
|
||||
self.assertTrue('application/json' in result.headers['content-type'])
|
||||
|
||||
result_json = result.json()
|
||||
|
||||
# Make sure the lights we added to the config are there
|
||||
self.assertTrue('light.office_light' in result_json)
|
||||
self.assertTrue('light.bedroom_light' in result_json)
|
||||
self.assertTrue('light.kitchen_light' not in result_json)
|
||||
|
||||
def test_get_light_state(self):
|
||||
# Turn office light on and set to 127 brightness
|
||||
self.hass.services.call(
|
||||
light.DOMAIN, const.SERVICE_TURN_ON,
|
||||
{
|
||||
const.ATTR_ENTITY_ID: 'light.office_light',
|
||||
light.ATTR_BRIGHTNESS: 127
|
||||
},
|
||||
blocking=True)
|
||||
|
||||
office_json = self.perform_get_light_state('light.office_light', 200)
|
||||
|
||||
self.assertEqual(office_json['state'][HUE_API_STATE_ON], True)
|
||||
self.assertEqual(office_json['state'][HUE_API_STATE_BRI], 127)
|
||||
|
||||
# Turn bedroom light off
|
||||
self.hass.services.call(
|
||||
light.DOMAIN, const.SERVICE_TURN_OFF,
|
||||
{
|
||||
const.ATTR_ENTITY_ID: 'light.bedroom_light'
|
||||
},
|
||||
blocking=True)
|
||||
|
||||
bedroom_json = self.perform_get_light_state('light.bedroom_light', 200)
|
||||
|
||||
self.assertEqual(bedroom_json['state'][HUE_API_STATE_ON], False)
|
||||
self.assertEqual(bedroom_json['state'][HUE_API_STATE_BRI], 0)
|
||||
|
||||
# Make sure kitchen light isn't accessible
|
||||
kitchen_url = '/api/username/lights/{}'.format('light.kitchen_light')
|
||||
kitchen_result = requests.get(
|
||||
BRIDGE_URL_BASE.format(kitchen_url), timeout=5)
|
||||
|
||||
self.assertEqual(kitchen_result.status_code, 404)
|
||||
|
||||
def test_put_light_state(self):
|
||||
self.perform_put_test_on_office_light()
|
||||
|
||||
# Turn the bedroom light on first
|
||||
self.hass.services.call(
|
||||
light.DOMAIN, const.SERVICE_TURN_ON,
|
||||
{const.ATTR_ENTITY_ID: 'light.bedroom_light',
|
||||
light.ATTR_BRIGHTNESS: 153},
|
||||
blocking=True)
|
||||
|
||||
bedroom_light = self.hass.states.get('light.bedroom_light')
|
||||
self.assertEqual(bedroom_light.state, STATE_ON)
|
||||
self.assertEqual(bedroom_light.attributes[light.ATTR_BRIGHTNESS], 153)
|
||||
|
||||
# Go through the API to turn it off
|
||||
bedroom_result = self.perform_put_light_state(
|
||||
'light.bedroom_light', False)
|
||||
|
||||
bedroom_result_json = bedroom_result.json()
|
||||
|
||||
self.assertEqual(bedroom_result.status_code, 200)
|
||||
self.assertTrue(
|
||||
'application/json' in bedroom_result.headers['content-type'])
|
||||
|
||||
self.assertEqual(len(bedroom_result_json), 1)
|
||||
|
||||
# Check to make sure the state changed
|
||||
bedroom_light = self.hass.states.get('light.bedroom_light')
|
||||
self.assertEqual(bedroom_light.state, STATE_OFF)
|
||||
|
||||
# Make sure we can't change the kitchen light state
|
||||
kitchen_result = self.perform_put_light_state(
|
||||
'light.kitchen_light', True)
|
||||
self.assertEqual(kitchen_result.status_code, 404)
|
||||
|
||||
def test_put_with_form_urlencoded_content_type(self):
|
||||
# Needed for Alexa
|
||||
self.perform_put_test_on_office_light(
|
||||
'application/x-www-form-urlencoded')
|
||||
|
||||
# Make sure we fail gracefully when we can't parse the data
|
||||
data = {'key1': 'value1', 'key2': 'value2'}
|
||||
result = requests.put(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}/state'.format("light.office_light")),
|
||||
data=data)
|
||||
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def test_entity_not_found(self):
|
||||
result = requests.get(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}'.format("not.existant_entity")),
|
||||
timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, 404)
|
||||
|
||||
result = requests.put(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}/state'.format("non.existant_entity")),
|
||||
timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, 404)
|
||||
|
||||
def test_allowed_methods(self):
|
||||
result = requests.get(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}/state'.format("light.office_light")))
|
||||
|
||||
self.assertEqual(result.status_code, 405)
|
||||
|
||||
result = requests.put(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}'.format("light.office_light")),
|
||||
data={'key1': 'value1'})
|
||||
|
||||
self.assertEqual(result.status_code, 405)
|
||||
|
||||
result = requests.put(
|
||||
BRIDGE_URL_BASE.format('/api/username/lights'),
|
||||
data={'key1': 'value1'})
|
||||
|
||||
self.assertEqual(result.status_code, 405)
|
||||
|
||||
def test_proper_put_state_request(self):
|
||||
# Test proper on value parsing
|
||||
result = requests.put(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}/state'.format("light.office_light")),
|
||||
data=json.dumps({HUE_API_STATE_ON: 1234}))
|
||||
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
# Test proper brightness value parsing
|
||||
result = requests.put(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}/state'.format("light.office_light")),
|
||||
data=json.dumps({
|
||||
HUE_API_STATE_ON: True,
|
||||
HUE_API_STATE_BRI: 'Hello world!'
|
||||
}))
|
||||
|
||||
self.assertEqual(result.status_code, 400)
|
||||
|
||||
def perform_put_test_on_office_light(self,
|
||||
content_type='application/json'):
|
||||
# Turn the office light off first
|
||||
self.hass.services.call(
|
||||
light.DOMAIN, const.SERVICE_TURN_OFF,
|
||||
{const.ATTR_ENTITY_ID: 'light.office_light'},
|
||||
blocking=True)
|
||||
|
||||
office_light = self.hass.states.get('light.office_light')
|
||||
self.assertEqual(office_light.state, STATE_OFF)
|
||||
|
||||
# Go through the API to turn it on
|
||||
office_result = self.perform_put_light_state(
|
||||
'light.office_light', True, 56, content_type)
|
||||
|
||||
office_result_json = office_result.json()
|
||||
|
||||
self.assertEqual(office_result.status_code, 200)
|
||||
self.assertTrue(
|
||||
'application/json' in office_result.headers['content-type'])
|
||||
|
||||
self.assertEqual(len(office_result_json), 2)
|
||||
|
||||
# Check to make sure the state changed
|
||||
office_light = self.hass.states.get('light.office_light')
|
||||
self.assertEqual(office_light.state, STATE_ON)
|
||||
self.assertEqual(office_light.attributes[light.ATTR_BRIGHTNESS], 56)
|
||||
|
||||
def perform_get_light_state(self, entity_id, expected_status):
|
||||
result = requests.get(
|
||||
BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}'.format(entity_id)), timeout=5)
|
||||
|
||||
self.assertEqual(result.status_code, expected_status)
|
||||
|
||||
if expected_status == 200:
|
||||
self.assertTrue(
|
||||
'application/json' in result.headers['content-type'])
|
||||
|
||||
return result.json()
|
||||
|
||||
return None
|
||||
|
||||
def perform_put_light_state(self, entity_id, is_on, brightness=None,
|
||||
content_type='application/json'):
|
||||
url = BRIDGE_URL_BASE.format(
|
||||
'/api/username/lights/{}/state'.format(entity_id))
|
||||
|
||||
req_headers = {'Content-Type': content_type}
|
||||
|
||||
data = {HUE_API_STATE_ON: is_on}
|
||||
|
||||
if brightness is not None:
|
||||
data[HUE_API_STATE_BRI] = brightness
|
||||
|
||||
result = requests.put(
|
||||
url, data=json.dumps(data), timeout=5, headers=req_headers)
|
||||
return result
|
||||
|
||||
|
||||
class MQTTBroker(object):
|
||||
"""Encapsulates an embedded MQTT broker."""
|
||||
|
||||
def __init__(self, host, port):
|
||||
"""Initialize a new instance."""
|
||||
from hbmqtt.broker import Broker
|
||||
|
||||
self._loop = asyncio.new_event_loop()
|
||||
|
||||
hbmqtt_config = {
|
||||
'listeners': {
|
||||
'default': {
|
||||
'max-connections': 50000,
|
||||
'type': 'tcp',
|
||||
'bind': '{}:{}'.format(host, port)
|
||||
}
|
||||
},
|
||||
'auth': {
|
||||
'plugins': ['auth.anonymous'],
|
||||
'allow-anonymous': True
|
||||
}
|
||||
}
|
||||
|
||||
self._broker = Broker(config=hbmqtt_config, loop=self._loop)
|
||||
|
||||
self._thread = threading.Thread(target=self._run_loop)
|
||||
self._started_ev = threading.Event()
|
||||
|
||||
def start(self):
|
||||
"""Start the broker."""
|
||||
self._thread.start()
|
||||
self._started_ev.wait()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the broker."""
|
||||
self._loop.call_soon_threadsafe(asyncio.async, self._broker.shutdown())
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
self._thread.join()
|
||||
|
||||
def _run_loop(self):
|
||||
asyncio.set_event_loop(self._loop)
|
||||
self._loop.run_until_complete(self._broker_coroutine())
|
||||
|
||||
self._started_ev.set()
|
||||
|
||||
self._loop.run_forever()
|
||||
self._loop.close()
|
||||
|
||||
@asyncio.coroutine
|
||||
def _broker_coroutine(self):
|
||||
yield from self._broker.start()
|
Loading…
x
Reference in New Issue
Block a user