HTTP API is now more RESTful

This commit is contained in:
Paulus Schoutsen 2014-10-17 00:17:02 -07:00
parent 951c3683b2
commit 001f27cdb4
4 changed files with 209 additions and 290 deletions

View File

@ -475,9 +475,8 @@ class StateMachine(object):
return list(self._states.keys()) return list(self._states.keys())
def all(self): def all(self):
""" Returns a dict mapping all entity_ids to their state. """ """ Returns a list of all states. """
return {entity_id: state.copy() for entity_id, state return [state.copy() for state in self._states.values()]
in self._states.items()}
def get(self, entity_id): def get(self, entity_id):
""" Returns the state of the specified entity. """ """ Returns the state of the specified entity. """

View File

@ -27,13 +27,10 @@ Example result:
/api/states - GET /api/states - GET
Returns a list of entities for which a state is available Returns a list of entities for which a state is available
Example result: Example result:
{ [
"entity_ids": [ { .. state object .. },
"Paulus_Nexus_4", { .. state object .. }
"weather.sun", ]
"all_devices"
]
}
/api/states/<entity_id> - GET /api/states/<entity_id> - GET
Returns the current state from an entity Returns the current state from an entity
@ -102,9 +99,6 @@ HTTP_METHOD_NOT_ALLOWED = 405
HTTP_UNPROCESSABLE_ENTITY = 422 HTTP_UNPROCESSABLE_ENTITY = 422
URL_ROOT = "/" URL_ROOT = "/"
URL_CHANGE_STATE = "/change_state"
URL_FIRE_EVENT = "/fire_event"
URL_CALL_SERVICE = "/call_service"
URL_STATIC = "/static/{}" URL_STATIC = "/static/{}"
@ -196,11 +190,6 @@ class RequestHandler(BaseHTTPRequestHandler):
PATHS = [ # debug interface PATHS = [ # debug interface
('GET', URL_ROOT, '_handle_get_root'), ('GET', URL_ROOT, '_handle_get_root'),
# These get compiled as RE because these methods are reused
# by other urls that use url parameters
('POST', re.compile(URL_CHANGE_STATE), '_handle_change_state'),
('POST', re.compile(URL_FIRE_EVENT), '_handle_fire_event'),
('POST', re.compile(URL_CALL_SERVICE), '_handle_call_service'),
# /api - for validation purposes # /api - for validation purposes
('GET', rem.URL_API, '_handle_get_api'), ('GET', rem.URL_API, '_handle_get_api'),
@ -212,13 +201,16 @@ class RequestHandler(BaseHTTPRequestHandler):
'_handle_get_api_states_entity'), '_handle_get_api_states_entity'),
('POST', ('POST',
re.compile(r'/api/states/(?P<entity_id>[a-zA-Z\._0-9]+)'), re.compile(r'/api/states/(?P<entity_id>[a-zA-Z\._0-9]+)'),
'_handle_change_state'), '_handle_post_state_entity'),
('PUT',
re.compile(r'/api/states/(?P<entity_id>[a-zA-Z\._0-9]+)'),
'_handle_post_state_entity'),
# /events # /events
('GET', rem.URL_API_EVENTS, '_handle_get_api_events'), ('GET', rem.URL_API_EVENTS, '_handle_get_api_events'),
('POST', ('POST',
re.compile(r'/api/events/(?P<event_type>[a-zA-Z\._0-9]+)'), re.compile(r'/api/events/(?P<event_type>[a-zA-Z\._0-9]+)'),
'_handle_fire_event'), '_handle_api_post_events_event'),
# /services # /services
('GET', rem.URL_API_SERVICES, '_handle_get_api_services'), ('GET', rem.URL_API_SERVICES, '_handle_get_api_services'),
@ -226,7 +218,7 @@ class RequestHandler(BaseHTTPRequestHandler):
re.compile((r'/api/services/' re.compile((r'/api/services/'
r'(?P<domain>[a-zA-Z\._0-9]+)/' r'(?P<domain>[a-zA-Z\._0-9]+)/'
r'(?P<service>[a-zA-Z\._0-9]+)')), r'(?P<service>[a-zA-Z\._0-9]+)')),
'_handle_call_service'), '_handle_post_api_services_domain_service'),
# /event_forwarding # /event_forwarding
('POST', rem.URL_API_EVENT_FORWARD, '_handle_post_api_event_forward'), ('POST', rem.URL_API_EVENT_FORWARD, '_handle_post_api_event_forward'),
@ -247,20 +239,31 @@ class RequestHandler(BaseHTTPRequestHandler):
# Read query input # Read query input
data = parse_qs(url.query) data = parse_qs(url.query)
# parse_qs gives a list for each value, take the latest element
for key in data:
data[key] = data[key][-1]
# Did we get post input ? # Did we get post input ?
content_length = int(self.headers.get('Content-Length', 0)) content_length = int(self.headers.get('Content-Length', 0))
if content_length: if content_length:
data.update(parse_qs(self.rfile.read( body_content = self.rfile.read(content_length).decode("UTF-8")
content_length).decode("UTF-8"))) try:
data.update(json.loads(body_content))
except ValueError:
self.server.logger.exception(
"Exception parsing JSON: {}".format(body_content))
try: self.send_response(HTTP_UNPROCESSABLE_ENTITY)
api_password = data['api_password'][0] return
except KeyError:
api_password = '' api_password = self.headers.get(rem.AUTH_HEADER)
if not api_password and 'api_password' in data:
api_password = data['api_password']
if '_METHOD' in data: if '_METHOD' in data:
method = data['_METHOD'][0] method = data['_METHOD']
if url.path.startswith('/api/'): if url.path.startswith('/api/'):
self.use_json = True self.use_json = True
@ -313,6 +316,14 @@ class RequestHandler(BaseHTTPRequestHandler):
""" POST request handler. """ """ POST request handler. """
self._handle_request('POST') self._handle_request('POST')
def do_PUT(self): # pylint: disable=invalid-name
""" PUT request handler. """
self._handle_request('PUT')
def do_DELETE(self): # pylint: disable=invalid-name
""" DELETE request handler. """
self._handle_request('DELETE')
def _verify_api_password(self, api_password): def _verify_api_password(self, api_password):
""" Helper method to verify the API password """ Helper method to verify the API password
and take action if incorrect. """ and take action if incorrect. """
@ -402,18 +413,18 @@ class RequestHandler(BaseHTTPRequestHandler):
"<th>Attributes</th><th>Last Changed</th>" "<th>Attributes</th><th>Last Changed</th>"
"</tr>").format(self.server.api_password)) "</tr>").format(self.server.api_password))
for entity_id, state in \ for state in \
sorted(self.server.hass.states.all().items(), sorted(self.server.hass.states.all(),
key=lambda item: item[0].lower()): key=lambda item: item.entity_id.lower()):
domain = util.split_entity_id(entity_id)[0] domain = util.split_entity_id(state.entity_id)[0]
attributes = "<br>".join( attributes = "<br>".join(
"{}: {}".format(attr, val) "{}: {}".format(attr, val)
for attr, val in state.attributes.items()) for attr, val in state.attributes.items())
write("<tr><td>{}</td><td>{}</td><td>{}".format( write("<tr><td>{}</td><td>{}</td><td>{}".format(
_get_domain_icon(domain), entity_id, state.state)) _get_domain_icon(domain), state.entity_id, state.state))
if state.state == STATE_ON or state.state == STATE_OFF: if state.state == STATE_ON or state.state == STATE_OFF:
if state.state == STATE_ON: if state.state == STATE_ON:
@ -569,135 +580,6 @@ class RequestHandler(BaseHTTPRequestHandler):
write("</div></body></html>") write("</div></body></html>")
# pylint: disable=invalid-name
def _handle_change_state(self, path_match, data):
""" Handles updating the state of an entity.
This handles the following paths:
/change_state
/api/states/<entity_id>
"""
try:
try:
entity_id = path_match.group('entity_id')
except IndexError:
# If group 'entity_id' does not exist in path_match
entity_id = data['entity_id'][0]
new_state = data['new_state'][0]
try:
attributes = json.loads(data['attributes'][0])
except KeyError:
# Happens if key 'attributes' does not exist
attributes = None
# Write state
self.server.hass.states.set(entity_id, new_state, attributes)
# Return state if json, else redirect to main page
if self.use_json:
state = self.server.hass.states.get(entity_id)
self._write_json(state.as_dict(),
status_code=HTTP_CREATED,
location=
rem.URL_API_STATES_ENTITY.format(entity_id))
else:
self._message(
"State of {} changed to {}".format(entity_id, new_state))
except KeyError:
# If new_state don't exist in post data
self._message(
"No new_state submitted.", HTTP_BAD_REQUEST)
except ValueError:
# Occurs during error parsing json
self._message(
"Invalid JSON for attributes", HTTP_UNPROCESSABLE_ENTITY)
# pylint: disable=invalid-name
def _handle_fire_event(self, path_match, data):
""" Handles firing of an event.
This handles the following paths:
/fire_event
/api/events/<event_type>
Events from /api are threated as remote events.
"""
try:
try:
event_type = path_match.group('event_type')
event_origin = ha.EventOrigin.remote
except IndexError:
# If group event_type does not exist in path_match
event_type = data['event_type'][0]
event_origin = ha.EventOrigin.local
if 'event_data' in data:
event_data = json.loads(data['event_data'][0])
else:
event_data = None
# Special case handling for event STATE_CHANGED
# We will try to convert state dicts back to State objects
if event_type == ha.EVENT_STATE_CHANGED and event_data:
for key in ('old_state', 'new_state'):
state = ha.State.from_dict(event_data.get(key))
if state:
event_data[key] = state
self.server.hass.bus.fire(event_type, event_data, event_origin)
self._message("Event {} fired.".format(event_type))
except KeyError:
# Occurs if event_type does not exist in data
self._message("No event_type received.", HTTP_BAD_REQUEST)
except ValueError:
# Occurs during error parsing json
self._message(
"Invalid JSON for event_data", HTTP_UNPROCESSABLE_ENTITY)
def _handle_call_service(self, path_match, data):
""" Handles calling a service.
This handles the following paths:
/call_service
/api/services/<domain>/<service>
"""
try:
try:
domain = path_match.group('domain')
service = path_match.group('service')
except IndexError:
# If group domain or service does not exist in path_match
domain = data['domain'][0]
service = data['service'][0]
try:
service_data = json.loads(data['service_data'][0])
except KeyError:
# Happens if key 'service_data' does not exist
service_data = None
self.server.hass.call_service(domain, service, service_data)
self._message("Service {}/{} called.".format(domain, service))
except KeyError:
# Occurs if domain or service does not exist in data
self._message("No domain or service received.", HTTP_BAD_REQUEST)
except ValueError:
# Occurs during error parsing json
self._message(
"Invalid JSON for service_data", HTTP_UNPROCESSABLE_ENTITY)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def _handle_get_api(self, path_match, data): def _handle_get_api(self, path_match, data):
""" Renders the debug interface. """ """ Renders the debug interface. """
@ -718,69 +600,152 @@ class RequestHandler(BaseHTTPRequestHandler):
if state: if state:
self._write_json(state) self._write_json(state)
else: else:
self._message("State does not exist.", HTTP_UNPROCESSABLE_ENTITY) self._message("State does not exist.", HTTP_NOT_FOUND)
def _handle_post_state_entity(self, path_match, data):
""" Handles updating the state of an entity.
This handles the following paths:
/api/states/<entity_id>
"""
entity_id = path_match.group('entity_id')
try:
new_state = data['state']
except KeyError:
self._message("state not specified", HTTP_BAD_REQUEST)
return
attributes = data['attributes'] if 'attributes' in data else None
is_new_state = self.server.hass.states.get(entity_id) is None
# Write state
self.server.hass.states.set(entity_id, new_state, attributes)
# Return state if json, else redirect to main page
if self.use_json:
state = self.server.hass.states.get(entity_id)
status_code = HTTP_CREATED if is_new_state else HTTP_OK
self._write_json(state.as_dict(),
status_code=status_code,
location=
rem.URL_API_STATES_ENTITY.format(entity_id))
else:
self._message(
"State of {} changed to {}".format(entity_id, new_state))
def _handle_get_api_events(self, path_match, data): def _handle_get_api_events(self, path_match, data):
""" Handles getting overview of event listeners. """ """ Handles getting overview of event listeners. """
self._write_json({'event_listeners': self.server.hass.bus.listeners}) self._write_json(self.server.hass.bus.listeners)
def _handle_api_post_events_event(self, path_match, data):
""" Handles firing of an event.
This handles the following paths:
/api/events/<event_type>
Events from /api are threated as remote events.
"""
event_type = path_match.group('event_type')
event_data = data.get('event_data')
if event_data is not None and not isinstance(event_data, dict):
self._message("event_data should be an object",
HTTP_UNPROCESSABLE_ENTITY)
event_origin = ha.EventOrigin.remote
# Special case handling for event STATE_CHANGED
# We will try to convert state dicts back to State objects
if event_type == ha.EVENT_STATE_CHANGED and event_data:
for key in ('old_state', 'new_state'):
state = ha.State.from_dict(event_data.get(key))
if state:
event_data[key] = state
self.server.hass.bus.fire(event_type, event_data, event_origin)
self._message("Event {} fired.".format(event_type))
def _handle_get_api_services(self, path_match, data): def _handle_get_api_services(self, path_match, data):
""" Handles getting overview of services. """ """ Handles getting overview of services. """
self._write_json({'services': self.server.hass.services.services}) self._write_json(self.server.hass.services.services)
# pylint: disable=invalid-name
def _handle_post_api_services_domain_service(self, path_match, data):
""" Handles calling a service.
This handles the following paths:
/api/services/<domain>/<service>
"""
domain = path_match.group('domain')
service = path_match.group('service')
service_data = data.get('service_data')
if service_data is not None and not isinstance(service_data, dict):
self._message("service_data should be an object",
HTTP_UNPROCESSABLE_ENTITY)
self.server.hass.call_service(domain, service, service_data)
self._message("Service {}/{} called.".format(domain, service))
# pylint: disable=invalid-name
def _handle_post_api_event_forward(self, path_match, data): def _handle_post_api_event_forward(self, path_match, data):
""" Handles adding an event forwarding target. """ """ Handles adding an event forwarding target. """
try: try:
host = data['host'][0] host = data['host']
api_password = data['api_password'][0] api_password = data['api_password']
port = int(data['port'][0]) if 'port' in data else None
if self.server.event_forwarder is None:
self.server.event_forwarder = \
rem.EventForwarder(self.server.hass)
api = rem.API(host, api_password, port)
self.server.event_forwarder.connect(api)
self._message("Event forwarding setup.")
except KeyError: except KeyError:
# Occurs if domain or service does not exist in data
self._message("No host or api_password received.", self._message("No host or api_password received.",
HTTP_BAD_REQUEST) HTTP_BAD_REQUEST)
return
try:
port = int(data['port']) if 'port' in data else None
except ValueError: except ValueError:
# Occurs during error parsing port
self._message( self._message(
"Invalid value received for port", HTTP_UNPROCESSABLE_ENTITY) "Invalid value received for port", HTTP_UNPROCESSABLE_ENTITY)
return
if self.server.event_forwarder is None:
self.server.event_forwarder = \
rem.EventForwarder(self.server.hass)
api = rem.API(host, api_password, port)
self.server.event_forwarder.connect(api)
self._message("Event forwarding setup.")
def _handle_delete_api_event_forward(self, path_match, data): def _handle_delete_api_event_forward(self, path_match, data):
""" Handles deleting an event forwarding target. """ """ Handles deleting an event forwarding target. """
try: try:
host = data['host'][0] host = data['host']
port = int(data['port'][0]) if 'port' in data else None
if self.server.event_forwarder is not None:
api = rem.API(host, None, port)
self.server.event_forwarder.disconnect(api)
self._message("Event forwarding cancelled.")
except KeyError: except KeyError:
# Occurs if domain or service does not exist in data self._message("No host received.",
self._message("No host or api_password received.",
HTTP_BAD_REQUEST) HTTP_BAD_REQUEST)
return
try:
port = int(data['port']) if 'port' in data else None
except ValueError: except ValueError:
# Occurs during error parsing port
self._message( self._message(
"Invalid value received for port", HTTP_UNPROCESSABLE_ENTITY) "Invalid value received for port", HTTP_UNPROCESSABLE_ENTITY)
return
if self.server.event_forwarder is not None:
api = rem.API(host, None, port)
self.server.event_forwarder.disconnect(api)
self._message("Event forwarding cancelled.")
def _handle_get_static(self, path_match, data): def _handle_get_static(self, path_match, data):
""" Returns a static file. """ """ Returns a static file. """
@ -838,7 +803,7 @@ class RequestHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
if data: if data is not None:
self.wfile.write( self.wfile.write(
json.dumps(data, indent=4, sort_keys=True, json.dumps(data, indent=4, sort_keys=True,
cls=rem.JSONEncoder).encode("UTF-8")) cls=rem.JSONEncoder).encode("UTF-8"))

View File

@ -21,6 +21,8 @@ import homeassistant as ha
SERVER_PORT = 8123 SERVER_PORT = 8123
AUTH_HEADER = "HA-access"
URL_API = "/api/" URL_API = "/api/"
URL_API_STATES = "/api/states" URL_API_STATES = "/api/states"
URL_API_STATES_ENTITY = "/api/states/{}" URL_API_STATES_ENTITY = "/api/states/{}"
@ -57,6 +59,7 @@ class API(object):
self.api_password = api_password self.api_password = api_password
self.base_url = "http://{}:{}".format(host, self.port) self.base_url = "http://{}:{}".format(host, self.port)
self.status = None self.status = None
self._headers = {AUTH_HEADER: api_password}
def validate_api(self, force_validate=False): def validate_api(self, force_validate=False):
""" Tests if we can communicate with the API. """ """ Tests if we can communicate with the API. """
@ -67,16 +70,18 @@ class API(object):
def __call__(self, method, path, data=None): def __call__(self, method, path, data=None):
""" Makes a call to the Home Assistant api. """ """ Makes a call to the Home Assistant api. """
data = data or {} if data is not None:
data['api_password'] = self.api_password data = json.dumps(data, cls=JSONEncoder)
url = urllib.parse.urljoin(self.base_url, path) url = urllib.parse.urljoin(self.base_url, path)
try: try:
if method == METHOD_GET: if method == METHOD_GET:
return requests.get(url, params=data, timeout=5) return requests.get(
url, params=data, timeout=5, headers=self._headers)
else: else:
return requests.request(method, url, data=data, timeout=5) return requests.request(
method, url, data=data, timeout=5, headers=self._headers)
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:
logging.getLogger(__name__).exception("Error connecting to server") logging.getLogger(__name__).exception("Error connecting to server")
@ -226,7 +231,8 @@ class StateMachine(ha.StateMachine):
def mirror(self): def mirror(self):
""" Discards current data and mirrors the remote state machine. """ """ Discards current data and mirrors the remote state machine. """
self._states = get_states(self._api, self.logger) self._states = {state.entity_id: state for state
in get_states(self._api, self.logger)}
def _state_changed_listener(self, event): def _state_changed_listener(self, event):
""" Listens for state changed events and applies them. """ """ Listens for state changed events and applies them. """
@ -297,11 +303,10 @@ def get_event_listeners(api, logger=None):
try: try:
req = api(METHOD_GET, URL_API_EVENTS) req = api(METHOD_GET, URL_API_EVENTS)
return req.json()['event_listeners'] if req.status_code == 200 else {} return req.json() if req.status_code == 200 else {}
except (ha.HomeAssistantError, ValueError, KeyError): except (ha.HomeAssistantError, ValueError):
# ValueError if req.json() can't parse the json # ValueError if req.json() can't parse the json
# KeyError if 'event_listeners' not found in parsed json
if logger: if logger:
logger.exception("Bus:Got unexpected result") logger.exception("Bus:Got unexpected result")
@ -312,7 +317,7 @@ def fire_event(api, event_type, event_data=None, logger=None):
""" Fire an event at remote API. """ """ Fire an event at remote API. """
if event_data: if event_data:
data = {'event_data': json.dumps(event_data, cls=JSONEncoder)} data = {'event_data': event_data}
else: else:
data = None data = None
@ -355,20 +360,11 @@ def get_states(api, logger=None):
req = api(METHOD_GET, req = api(METHOD_GET,
URL_API_STATES) URL_API_STATES)
json_result = req.json() return [ha.State.from_dict(item) for
states = {} item in req.json()]
for entity_id, state_dict in json_result.items():
state = ha.State.from_dict(state_dict)
if state:
states[entity_id] = state
return states
except (ha.HomeAssistantError, ValueError, AttributeError): except (ha.HomeAssistantError, ValueError, AttributeError):
# ValueError if req.json() can't parse the json # ValueError if req.json() can't parse the json
# AttributeError if parsed JSON was not a dict
if logger: if logger:
logger.exception("Error getting state") logger.exception("Error getting state")
@ -380,8 +376,8 @@ def set_state(api, entity_id, new_state, attributes=None, logger=None):
attributes = attributes or {} attributes = attributes or {}
data = {'new_state': new_state, data = {'state': new_state,
'attributes': json.dumps(attributes)} 'attributes': attributes}
try: try:
req = api(METHOD_POST, req = api(METHOD_POST,
@ -410,11 +406,10 @@ def get_services(api, logger=None):
try: try:
req = api(METHOD_GET, URL_API_SERVICES) req = api(METHOD_GET, URL_API_SERVICES)
return req.json()['services'] if req.status_code == 200 else {} return req.json() if req.status_code == 200 else {}
except (ha.HomeAssistantError, ValueError, KeyError): except (ha.HomeAssistantError, ValueError):
# ValueError if req.json() can't parse the json # ValueError if req.json() can't parse the json
# KeyError if not all expected keys are in the returned JSON
if logger: if logger:
logger.exception("ServiceRegistry:Got unexpected result") logger.exception("ServiceRegistry:Got unexpected result")

View File

@ -8,6 +8,7 @@ Provides tests to verify that Home Assistant modules do what they should do.
import unittest import unittest
import time import time
import json
import requests import requests
@ -19,6 +20,8 @@ API_PASSWORD = "test1234"
HTTP_BASE_URL = "http://127.0.0.1:{}".format(remote.SERVER_PORT) HTTP_BASE_URL = "http://127.0.0.1:{}".format(remote.SERVER_PORT)
HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD}
def _url(path=""): def _url(path=""):
""" Helper method to generate urls. """ """ Helper method to generate urls. """
@ -37,7 +40,7 @@ def ensure_homeassistant_started():
if not HAHelper.hass: if not HAHelper.hass:
hass = ha.HomeAssistant() hass = ha.HomeAssistant()
hass.bus.listen('test_event', len) hass.bus.listen('test_event', lambda _: _)
hass.states.set('test', 'a_state') hass.states.set('test', 'a_state')
http.setup(hass, http.setup(hass,
@ -90,8 +93,7 @@ class TestHTTP(unittest.TestCase):
""" Test if we can login by comparing not logged in screen to """ Test if we can login by comparing not logged in screen to
logged in screen. """ logged in screen. """
with_pw = requests.get( with_pw = requests.get(_url(), headers=HA_HEADERS)
_url("/?api_password={}".format(API_PASSWORD)))
without_pw = requests.get(_url()) without_pw = requests.get(_url())
@ -107,62 +109,24 @@ class TestHTTP(unittest.TestCase):
req = requests.get( req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test")), _url(remote.URL_API_STATES_ENTITY.format("test")),
params={"api_password": "not the password"}) headers={remote.AUTH_HEADER: 'wrongpassword'})
self.assertEqual(req.status_code, 401) self.assertEqual(req.status_code, 401)
def test_debug_change_state(self):
""" Test if we can change a state from the debug interface. """
self.hass.states.set("test.test", "not_to_be_set")
requests.post(_url(http.URL_CHANGE_STATE),
data={"entity_id": "test.test",
"new_state": "debug_state_change2",
"api_password": API_PASSWORD})
self.assertEqual(self.hass.states.get("test.test").state,
"debug_state_change2")
def test_debug_fire_event(self):
""" Test if we can fire an event from the debug interface. """
test_value = []
def listener(event): # pylint: disable=unused-argument
""" Helper method that will verify that our event got called and
that test if our data came through. """
if "test" in event.data:
test_value.append(1)
self.hass.listen_once_event("test_event_with_data", listener)
requests.post(
_url(http.URL_FIRE_EVENT),
data={"event_type": "test_event_with_data",
"event_data": '{"test": 1}',
"api_password": API_PASSWORD})
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
def test_api_list_state_entities(self): def test_api_list_state_entities(self):
""" Test if the debug interface allows us to list state entities. """ """ Test if the debug interface allows us to list state entities. """
req = requests.get(_url(remote.URL_API_STATES), req = requests.get(_url(remote.URL_API_STATES),
data={"api_password": API_PASSWORD}) headers=HA_HEADERS)
remote_data = req.json() remote_data = [ha.State.from_dict(item) for item in req.json()]
local_data = {entity_id: state.as_dict() for entity_id, state self.assertEqual(self.hass.states.all(), remote_data)
in self.hass.states.all().items()}
self.assertEqual(local_data, remote_data)
def test_api_get(self): def test_api_get(self):
""" Test if the debug interface allows us to get a state. """ """ Test if the debug interface allows us to get a state. """
req = requests.get( req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("test")), _url(remote.URL_API_STATES_ENTITY.format("test")),
data={"api_password": API_PASSWORD}) headers=HA_HEADERS)
data = ha.State.from_dict(req.json()) data = ha.State.from_dict(req.json())
@ -176,9 +140,9 @@ class TestHTTP(unittest.TestCase):
""" Test if the debug interface allows us to get a state. """ """ Test if the debug interface allows us to get a state. """
req = requests.get( req = requests.get(
_url(remote.URL_API_STATES_ENTITY.format("does_not_exist")), _url(remote.URL_API_STATES_ENTITY.format("does_not_exist")),
params={"api_password": API_PASSWORD}) headers=HA_HEADERS)
self.assertEqual(req.status_code, 422) self.assertEqual(req.status_code, 404)
def test_api_state_change(self): def test_api_state_change(self):
""" Test if we can change the state of an entity that exists. """ """ Test if we can change the state of an entity that exists. """
@ -186,8 +150,8 @@ class TestHTTP(unittest.TestCase):
self.hass.states.set("test.test", "not_to_be_set") self.hass.states.set("test.test", "not_to_be_set")
requests.post(_url(remote.URL_API_STATES_ENTITY.format("test.test")), requests.post(_url(remote.URL_API_STATES_ENTITY.format("test.test")),
data={"new_state": "debug_state_change2", data=json.dumps({"state": "debug_state_change2",
"api_password": API_PASSWORD}) "api_password": API_PASSWORD}))
self.assertEqual(self.hass.states.get("test.test").state, self.assertEqual(self.hass.states.get("test.test").state,
"debug_state_change2") "debug_state_change2")
@ -202,8 +166,8 @@ class TestHTTP(unittest.TestCase):
req = requests.post( req = requests.post(
_url(remote.URL_API_STATES_ENTITY.format( _url(remote.URL_API_STATES_ENTITY.format(
"test_entity_that_does_not_exist")), "test_entity_that_does_not_exist")),
data={"new_state": new_state, data=json.dumps({"state": new_state,
"api_password": API_PASSWORD}) "api_password": API_PASSWORD}))
cur_state = (self.hass.states. cur_state = (self.hass.states.
get("test_entity_that_does_not_exist").state) get("test_entity_that_does_not_exist").state)
@ -224,7 +188,7 @@ class TestHTTP(unittest.TestCase):
requests.post( requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")), _url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")),
data={"api_password": API_PASSWORD}) headers=HA_HEADERS)
# Allow the event to take place # Allow the event to take place
time.sleep(1) time.sleep(1)
@ -246,8 +210,8 @@ class TestHTTP(unittest.TestCase):
requests.post( requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test_event_with_data")), _url(remote.URL_API_EVENTS_EVENT.format("test_event_with_data")),
data={"event_data": '{"test": 1}', data=json.dumps({"event_data": {"test": 1}}),
"api_password": API_PASSWORD}) headers=HA_HEADERS)
# Allow the event to take place # Allow the event to take place
time.sleep(1) time.sleep(1)
@ -267,8 +231,8 @@ class TestHTTP(unittest.TestCase):
req = requests.post( req = requests.post(
_url(remote.URL_API_EVENTS_EVENT.format("test_event")), _url(remote.URL_API_EVENTS_EVENT.format("test_event")),
data={"event_data": 'not json', data=json.dumps({"event_data": 'not an object'}),
"api_password": API_PASSWORD}) headers=HA_HEADERS)
# It shouldn't but if it fires, allow the event to take place # It shouldn't but if it fires, allow the event to take place
time.sleep(1) time.sleep(1)
@ -279,20 +243,16 @@ class TestHTTP(unittest.TestCase):
def test_api_get_event_listeners(self): def test_api_get_event_listeners(self):
""" Test if we can get the list of events being listened for. """ """ Test if we can get the list of events being listened for. """
req = requests.get(_url(remote.URL_API_EVENTS), req = requests.get(_url(remote.URL_API_EVENTS),
params={"api_password": API_PASSWORD}) headers=HA_HEADERS)
data = req.json() self.assertEqual(req.json(), self.hass.bus.listeners)
self.assertEqual(data['event_listeners'], self.hass.bus.listeners)
def test_api_get_services(self): def test_api_get_services(self):
""" Test if we can get a dict describing current services. """ """ Test if we can get a dict describing current services. """
req = requests.get(_url(remote.URL_API_SERVICES), req = requests.get(_url(remote.URL_API_SERVICES),
params={"api_password": API_PASSWORD}) headers=HA_HEADERS)
data = req.json() self.assertEqual(req.json(), self.hass.services.services)
self.assertEqual(data['services'], self.hass.services.services)
def test_api_call_service_no_data(self): def test_api_call_service_no_data(self):
""" Test if the API allows us to call a service. """ """ Test if the API allows us to call a service. """
@ -307,7 +267,7 @@ class TestHTTP(unittest.TestCase):
requests.post( requests.post(
_url(remote.URL_API_SERVICES_SERVICE.format( _url(remote.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")), "test_domain", "test_service")),
data={"api_password": API_PASSWORD}) headers=HA_HEADERS)
# Allow the event to take place # Allow the event to take place
time.sleep(1) time.sleep(1)
@ -329,8 +289,8 @@ class TestHTTP(unittest.TestCase):
requests.post( requests.post(
_url(remote.URL_API_SERVICES_SERVICE.format( _url(remote.URL_API_SERVICES_SERVICE.format(
"test_domain", "test_service")), "test_domain", "test_service")),
data={"service_data": '{"test": 1}', data=json.dumps({"service_data": {"test": 1}}),
"api_password": API_PASSWORD}) headers=HA_HEADERS)
# Allow the event to take place # Allow the event to take place
time.sleep(1) time.sleep(1)