diff --git a/homeassistant/__init__.py b/homeassistant/__init__.py index c06ef6f35ab..1372f86b3b4 100644 --- a/homeassistant/__init__.py +++ b/homeassistant/__init__.py @@ -62,6 +62,10 @@ class HomeAssistant(object): self.services = ServiceRegistry(self.bus, pool) self.states = StateMachine(self.bus) + # Components in a thread we might want to stop later + self.timer = None + self.http = None + self.config_dir = os.getcwd() def get_config_path(self, path): @@ -70,7 +74,7 @@ class HomeAssistant(object): def start(self): """ Start home assistant. """ - Timer(self) + self.timer = Timer(self) self.bus.fire(EVENT_HOMEASSISTANT_START) @@ -89,6 +93,8 @@ class HomeAssistant(object): except KeyboardInterrupt: break + self.stop() + def call_service(self, domain, service, service_data=None): """ Fires event to call specified service. """ event_data = service_data or {} @@ -224,6 +230,18 @@ class HomeAssistant(object): self.bus.listen(event_type, onetime_listener) + def stop(self): + """ Stops Home Assistant and shuts down all threads. """ + _LOGGER.info("Stopping") + + self._pool.stop() + + if self.http is not None: + self.http.shutdown() + + if self.timer is not None: + self.timer.shutdown() + def _process_match_param(parameter): """ Wraps parameter in a list if it is not one and returns it. """ @@ -598,6 +616,7 @@ class Timer(threading.Thread): self.daemon = True self._bus = hass.bus self.interval = interval or TIMER_INTERVAL + self._stop = threading.Event() # We want to be able to fire every time a minute starts (seconds=0). # We want this so other modules can use that to make sure they fire @@ -617,7 +636,7 @@ class Timer(threading.Thread): calc_now = dt.datetime.now interval = self.interval - while True: + while not self._stop.isSet(): now = calc_now() # First check checks if we are not on a second matching the @@ -642,6 +661,10 @@ class Timer(threading.Thread): self._bus.fire(EVENT_TIME_CHANGED, {ATTR_NOW: now}) + def shutdown(self): + _LOGGER.info("Timer:Stopping") + self._stop.set() + class HomeAssistantError(Exception): """ General Home Assistant exception occured. """ diff --git a/homeassistant/components/http/__init__.py b/homeassistant/components/http/__init__.py index 0072e5c2f38..ecf83f06c7d 100644 --- a/homeassistant/components/http/__init__.py +++ b/homeassistant/components/http/__init__.py @@ -110,7 +110,7 @@ CONF_DEVELOPMENT = "development" _LOGGER = logging.getLogger(__name__) - +# TODO add shutdown https://docs.python.org/3.4/library/socketserver.html#socketserver.BaseServer.shutdown def setup(hass, config): """ Sets up the HTTP API and debug interface. """ @@ -141,12 +141,16 @@ def setup(hass, config): hass.local_api = \ rem.API(util.get_local_ip(), api_password, server_port) + hass.server = server + return True class HomeAssistantHTTPServer(ThreadingMixIn, HTTPServer): """ Handle HTTP requests in a threaded fashion. """ + allow_reuse_address = True + # pylint: disable=too-many-arguments def __init__(self, server_address, RequestHandlerClass, hass, api_password, development=False): diff --git a/homeassistant/test.py b/homeassistant/test.py deleted file mode 100644 index e960596a58d..00000000000 --- a/homeassistant/test.py +++ /dev/null @@ -1,897 +0,0 @@ -""" -homeassistant.test -~~~~~~~~~~~~~~~~~~ - -Provides tests to verify that Home Assistant modules do what they should do. - -""" -# pylint: disable=protected-access,too-many-public-methods -import re -import os -import unittest -import time -import json -import threading -from datetime import datetime - -import requests - -import homeassistant as ha -import homeassistant.loader as loader -import homeassistant.util as util -import homeassistant.remote as remote -import homeassistant.components as comps -import homeassistant.components.http as http - -API_PASSWORD = "test1234" - -HTTP_BASE_URL = "http://127.0.0.1:{}".format(remote.SERVER_PORT) - -HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD} - - -def _url(path=""): - """ Helper method to generate urls. """ - return HTTP_BASE_URL + path - - -class HAHelper(object): # pylint: disable=too-few-public-methods - """ Helper class to keep track of current running HA instance. """ - hass = None - slave = None - - -def ensure_homeassistant_started(): - """ Ensures home assistant is started. """ - - if not HAHelper.hass: - hass = ha.HomeAssistant() - - hass.bus.listen('test_event', lambda _: _) - hass.states.set('test.test', 'a_state') - - http.setup(hass, - {http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD}}) - - hass.start() - - HAHelper.hass = hass - - return HAHelper.hass - - -def ensure_slave_started(): - """ Ensure a home assistant slave is started. """ - - ensure_homeassistant_started() - - if not HAHelper.slave: - local_api = remote.API("127.0.0.1", API_PASSWORD, 8124) - remote_api = remote.API("127.0.0.1", API_PASSWORD) - slave = remote.HomeAssistant(remote_api, local_api) - - http.setup(slave, - {http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD, - http.CONF_SERVER_PORT: 8124}}) - - slave.start() - - HAHelper.slave = slave - - return HAHelper.slave - - -class TestHomeAssistant(unittest.TestCase): - """ - Tests the Home Assistant core classes. - Currently only includes tests to test cases that do not - get tested in the API integration tests. - """ - - def setUp(self): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - self.hass = ha.HomeAssistant() - self.hass.states.set("light.Bowl", "on") - self.hass.states.set("switch.AC", "off") - - def test_get_config_path(self): - """ Test get_config_path method. """ - self.assertEqual(os.getcwd(), self.hass.config_dir) - - self.assertEqual(os.path.join(os.getcwd(), "test.conf"), - self.hass.get_config_path("test.conf")) - - def test_block_till_stoped(self): - """ Test if we can block till stop service is called. """ - blocking_thread = threading.Thread(target=self.hass.block_till_stopped) - - self.assertFalse(blocking_thread.is_alive()) - - blocking_thread.start() - # Python will now give attention to the other thread - time.sleep(.01) - - self.assertTrue(blocking_thread.is_alive()) - - self.hass.call_service(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP) - self.hass._pool.block_till_done() - - # hass.block_till_stopped checks every second if it should quit - # we have to wait worst case 1 second - wait_loops = 0 - while blocking_thread.is_alive() and wait_loops < 10: - wait_loops += 1 - time.sleep(0.1) - - self.assertFalse(blocking_thread.is_alive()) - - def test_get_entity_ids(self): - """ Test get_entity_ids method. """ - ent_ids = self.hass.get_entity_ids() - self.assertEqual(2, len(ent_ids)) - self.assertTrue('light.Bowl' in ent_ids) - self.assertTrue('switch.AC' in ent_ids) - - ent_ids = self.hass.get_entity_ids('light') - self.assertEqual(1, len(ent_ids)) - self.assertTrue('light.Bowl' in ent_ids) - - def test_track_state_change(self): - """ Test track_state_change. """ - # 2 lists to track how often our callbacks got called - specific_runs = [] - wildcard_runs = [] - - self.hass.track_state_change( - 'light.Bowl', lambda a, b, c: specific_runs.append(1), 'on', 'off') - - self.hass.track_state_change( - 'light.Bowl', lambda a, b, c: wildcard_runs.append(1), - ha.MATCH_ALL, ha.MATCH_ALL) - - # Set same state should not trigger a state change/listener - self.hass.states.set('light.Bowl', 'on') - self.hass._pool.block_till_done() - self.assertEqual(0, len(specific_runs)) - self.assertEqual(0, len(wildcard_runs)) - - # State change off -> on - self.hass.states.set('light.Bowl', 'off') - self.hass._pool.block_till_done() - self.assertEqual(1, len(specific_runs)) - self.assertEqual(1, len(wildcard_runs)) - - # State change off -> off - self.hass.states.set('light.Bowl', 'off', {"some_attr": 1}) - self.hass._pool.block_till_done() - self.assertEqual(1, len(specific_runs)) - self.assertEqual(2, len(wildcard_runs)) - - # State change off -> on - self.hass.states.set('light.Bowl', 'on') - self.hass._pool.block_till_done() - self.assertEqual(1, len(specific_runs)) - self.assertEqual(3, len(wildcard_runs)) - - def test_listen_once_event(self): - """ Test listen_once_event method. """ - runs = [] - - self.hass.listen_once_event('test_event', lambda x: runs.append(1)) - - self.hass.bus.fire('test_event') - self.hass._pool.block_till_done() - self.assertEqual(1, len(runs)) - - # Second time it should not increase runs - self.hass.bus.fire('test_event') - self.hass._pool.block_till_done() - self.assertEqual(1, len(runs)) - - def test_track_point_in_time(self): - """ Test track point in time. """ - before_birthday = datetime(1985, 7, 9, 12, 0, 0) - birthday_paulus = datetime(1986, 7, 9, 12, 0, 0) - after_birthday = datetime(1987, 7, 9, 12, 0, 0) - - runs = [] - - self.hass.track_point_in_time( - lambda x: runs.append(1), birthday_paulus) - - self._send_time_changed(before_birthday) - self.hass._pool.block_till_done() - self.assertEqual(0, len(runs)) - - self._send_time_changed(birthday_paulus) - self.hass._pool.block_till_done() - self.assertEqual(1, len(runs)) - - # A point in time tracker will only fire once, this should do nothing - self._send_time_changed(birthday_paulus) - self.hass._pool.block_till_done() - self.assertEqual(1, len(runs)) - - self.hass.track_point_in_time( - lambda x: runs.append(1), birthday_paulus) - - self._send_time_changed(after_birthday) - self.hass._pool.block_till_done() - self.assertEqual(2, len(runs)) - - def test_track_time_change(self): - """ Test tracking time change. """ - wildcard_runs = [] - specific_runs = [] - - self.hass.track_time_change(lambda x: wildcard_runs.append(1)) - self.hass.track_time_change( - lambda x: specific_runs.append(1), second=[0, 30]) - - self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0)) - self.hass._pool.block_till_done() - self.assertEqual(1, len(specific_runs)) - self.assertEqual(1, len(wildcard_runs)) - - self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15)) - self.hass._pool.block_till_done() - self.assertEqual(1, len(specific_runs)) - self.assertEqual(2, len(wildcard_runs)) - - self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30)) - self.hass._pool.block_till_done() - self.assertEqual(2, len(specific_runs)) - self.assertEqual(3, len(wildcard_runs)) - - def _send_time_changed(self, now): - """ Send a time changed event. """ - self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now}) - - -class TestEvent(unittest.TestCase): - """ Test Event class. """ - def test_repr(self): - """ Test that repr method works. #MoreCoverage """ - self.assertEqual( - "", - str(ha.Event("TestEvent"))) - - self.assertEqual( - "", - str(ha.Event("TestEvent", - {"beer": "nice"}, - ha.EventOrigin.remote))) - - -class TestEventBus(unittest.TestCase): - """ Test EventBus methods. """ - - def setUp(self): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - self.bus = ha.EventBus() - self.bus.listen('test_event', lambda x: len) - - def test_add_remove_listener(self): - """ Test remove_listener method. """ - old_count = len(self.bus.listeners) - - listener = lambda x: len - - self.bus.listen('test', listener) - - self.assertEqual(old_count + 1, len(self.bus.listeners)) - - # Try deleting a non registered listener, nothing should happen - self.bus.remove_listener('test', lambda x: len) - - # Remove listener - self.bus.remove_listener('test', listener) - self.assertEqual(old_count, len(self.bus.listeners)) - - # Try deleting listener while category doesn't exist either - self.bus.remove_listener('test', listener) - - -class TestState(unittest.TestCase): - """ Test EventBus methods. """ - - def test_init(self): - """ Test state.init """ - self.assertRaises( - ha.InvalidEntityFormatError, ha.State, - 'invalid_entity_format', 'test_state') - - def test_repr(self): - """ Test state.repr """ - self.assertEqual("", - str(ha.State( - "happy.happy", "on", - last_changed=datetime(1984, 12, 8, 12, 0, 0)))) - - self.assertEqual("", - str(ha.State("happy.happy", "on", {"brightness": 144}, - datetime(1984, 12, 8, 12, 0, 0)))) - - -class TestStateMachine(unittest.TestCase): - """ Test EventBus methods. """ - - def setUp(self): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - self.bus = ha.EventBus() - self.states = ha.StateMachine(self.bus) - self.states.set("light.Bowl", "on") - self.states.set("switch.AC", "off") - - def test_is_state(self): - """ Test is_state method. """ - self.assertTrue(self.states.is_state('light.Bowl', 'on')) - self.assertFalse(self.states.is_state('light.Bowl', 'off')) - self.assertFalse(self.states.is_state('light.Non_existing', 'on')) - - def test_remove(self): - """ Test remove method. """ - self.assertTrue('light.Bowl' in self.states.entity_ids) - self.assertTrue(self.states.remove('light.Bowl')) - self.assertFalse('light.Bowl' in self.states.entity_ids) - - # If it does not exist, we should get False - self.assertFalse(self.states.remove('light.Bowl')) - - -class TestServiceCall(unittest.TestCase): - """ Test ServiceCall class. """ - def test_repr(self): - """ Test repr method. """ - self.assertEqual( - "", - str(ha.ServiceCall('homeassistant', 'start'))) - - self.assertEqual( - "", - str(ha.ServiceCall('homeassistant', 'start', {"fast": "yes"}))) - - -class TestServiceRegistry(unittest.TestCase): - """ Test EventBus methods. """ - - def setUp(self): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - pool = ha.create_worker_pool() - self.bus = ha.EventBus(pool) - self.services = ha.ServiceRegistry(self.bus, pool) - self.services.register("test_domain", "test_service", lambda x: len) - - def test_has_service(self): - """ Test has_service method. """ - self.assertTrue( - self.services.has_service("test_domain", "test_service")) - - -class TestLoader(unittest.TestCase): - """ Test the loader module. """ - def setUp(self): # pylint: disable=invalid-name - self.hass = ha.HomeAssistant() - loader.prepare(self.hass) - - def test_get_component(self): - """ Test if get_component works. """ - self.assertEqual(http, loader.get_component('http')) - - -class TestUtil(unittest.TestCase): - """ Tests util methods. """ - def test_sanitize_filename(self): - """ Test sanitize_filename. """ - self.assertEqual("test", util.sanitize_filename("test")) - self.assertEqual("test", util.sanitize_filename("/test")) - self.assertEqual("test", util.sanitize_filename("..test")) - self.assertEqual("test", util.sanitize_filename("\\test")) - self.assertEqual("test", util.sanitize_filename("\\../test")) - - def test_sanitize_path(self): - """ Test sanitize_path. """ - self.assertEqual("test/path", util.sanitize_path("test/path")) - self.assertEqual("test/path", util.sanitize_path("~test/path")) - self.assertEqual("//test/path", - util.sanitize_path("~/../test/path")) - - def test_slugify(self): - """ Test slugify. """ - self.assertEqual("Test", util.slugify("T-!@#$!#@$!$est")) - self.assertEqual("Test_More", util.slugify("Test More")) - self.assertEqual("Test_More", util.slugify("Test_(More)")) - - def test_datetime_to_str(self): - """ Test datetime_to_str. """ - self.assertEqual("12:00:00 09-07-1986", - util.datetime_to_str(datetime(1986, 7, 9, 12, 0, 0))) - - def test_str_to_datetime(self): - """ Test str_to_datetime. """ - self.assertEqual(datetime(1986, 7, 9, 12, 0, 0), - util.str_to_datetime("12:00:00 09-07-1986")) - - def test_split_entity_id(self): - """ Test split_entity_id. """ - self.assertEqual(['domain', 'object_id'], - util.split_entity_id('domain.object_id')) - - def test_repr_helper(self): - """ Test repr_helper. """ - self.assertEqual("A", util.repr_helper("A")) - self.assertEqual("5", util.repr_helper(5)) - self.assertEqual("True", util.repr_helper(True)) - self.assertEqual("test=1", - util.repr_helper({"test": 1})) - self.assertEqual("12:00:00 09-07-1986", - util.repr_helper(datetime(1986, 7, 9, 12, 0, 0))) - - # pylint: disable=invalid-name - def test_color_RGB_to_xy(self): - """ Test color_RGB_to_xy. """ - self.assertEqual((0, 0), util.color_RGB_to_xy(0, 0, 0)) - self.assertEqual((0.3127159072215825, 0.3290014805066623), - util.color_RGB_to_xy(255, 255, 255)) - - self.assertEqual((0.15001662234042554, 0.060006648936170214), - util.color_RGB_to_xy(0, 0, 255)) - - self.assertEqual((0.3, 0.6), util.color_RGB_to_xy(0, 255, 0)) - - self.assertEqual((0.6400744994567747, 0.3299705106316933), - util.color_RGB_to_xy(255, 0, 0)) - - def test_convert(self): - """ Test convert. """ - self.assertEqual(5, util.convert("5", int)) - self.assertEqual(5.0, util.convert("5", float)) - self.assertEqual(True, util.convert("True", bool)) - self.assertEqual(1, util.convert("NOT A NUMBER", int, 1)) - self.assertEqual(1, util.convert(None, int, 1)) - - def test_ensure_unique_string(self): - """ Test ensure_unique_string. """ - self.assertEqual( - "Beer_3", - util.ensure_unique_string("Beer", ["Beer", "Beer_2"])) - - -class TestComponentsCore(unittest.TestCase): - """ Tests homeassistant.components module. """ - - def setUp(self): # pylint: disable=invalid-name - """ Init needed objects. """ - self.hass = ha.HomeAssistant() - loader.prepare(self.hass) - self.assertTrue(comps.setup(self.hass, {})) - - self.hass.states.set('light.Bowl', comps.STATE_ON) - self.hass.states.set('light.Ceiling', comps.STATE_OFF) - - def test_is_on(self): - """ Test is_on method. """ - self.assertTrue(comps.is_on(self.hass, 'light.Bowl')) - self.assertFalse(comps.is_on(self.hass, 'light.Ceiling')) - self.assertTrue(comps.is_on(self.hass)) - - def test_turn_on(self): - """ Test turn_on method. """ - runs = [] - self.hass.services.register( - 'light', comps.SERVICE_TURN_ON, lambda x: runs.append(1)) - - comps.turn_on(self.hass, 'light.Ceiling') - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(runs)) - - def test_turn_off(self): - """ Test turn_off method. """ - runs = [] - self.hass.services.register( - 'light', comps.SERVICE_TURN_OFF, lambda x: runs.append(1)) - - comps.turn_off(self.hass, 'light.Bowl') - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(runs)) - - def test_extract_entity_ids(self): - """ Test extract_entity_ids method. """ - call = ha.ServiceCall('light', 'turn_on', - {comps.ATTR_ENTITY_ID: 'light.Bowl'}) - - self.assertEqual(['light.Bowl'], - comps.extract_entity_ids(self.hass, call)) - - call = ha.ServiceCall('light', 'turn_on', - {comps.ATTR_ENTITY_ID: ['light.Bowl']}) - - self.assertEqual(['light.Bowl'], - comps.extract_entity_ids(self.hass, call)) - - -class TestHTTP(unittest.TestCase): - """ Test the HTTP debug interface and API. """ - - @classmethod - def setUpClass(cls): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - cls.hass = ensure_homeassistant_started() - - def test_get_frontend(self): - """ Tests if we can get the frontend. """ - req = requests.get(_url("")) - - self.assertEqual(200, req.status_code) - - frontendjs = re.search( - r'(?P\/static\/frontend-[A-Za-z0-9]{32}.html)', - req.text).groups(0)[0] - - self.assertIsNotNone(frontendjs) - - req = requests.get(_url(frontendjs)) - - self.assertEqual(200, req.status_code) - - def test_api_password(self): - """ Test if we get access denied if we omit or provide - a wrong api password. """ - req = requests.get( - _url(remote.URL_API_STATES_ENTITY.format("test"))) - - self.assertEqual(401, req.status_code) - - req = requests.get( - _url(remote.URL_API_STATES_ENTITY.format("test")), - headers={remote.AUTH_HEADER: 'wrongpassword'}) - - self.assertEqual(401, req.status_code) - - def test_api_list_state_entities(self): - """ Test if the debug interface allows us to list state entities. """ - req = requests.get(_url(remote.URL_API_STATES), - headers=HA_HEADERS) - - remote_data = [ha.State.from_dict(item) for item in req.json()] - - self.assertEqual(self.hass.states.all(), remote_data) - - def test_api_get_state(self): - """ Test if the debug interface allows us to get a state. """ - req = requests.get( - _url(remote.URL_API_STATES_ENTITY.format("test.test")), - headers=HA_HEADERS) - - data = ha.State.from_dict(req.json()) - - state = self.hass.states.get("test.test") - - self.assertEqual(state.state, data.state) - self.assertEqual(state.last_changed, data.last_changed) - self.assertEqual(state.attributes, data.attributes) - - def test_api_get_non_existing_state(self): - """ Test if the debug interface allows us to get a state. """ - req = requests.get( - _url(remote.URL_API_STATES_ENTITY.format("does_not_exist")), - headers=HA_HEADERS) - - self.assertEqual(404, req.status_code) - - def test_api_state_change(self): - """ Test if we can change the state of an entity that exists. """ - - self.hass.states.set("test.test", "not_to_be_set") - - requests.post(_url(remote.URL_API_STATES_ENTITY.format("test.test")), - data=json.dumps({"state": "debug_state_change2", - "api_password": API_PASSWORD})) - - self.assertEqual("debug_state_change2", - self.hass.states.get("test.test").state) - - # pylint: disable=invalid-name - def test_api_state_change_of_non_existing_entity(self): - """ Test if the API allows us to change a state of - a non existing entity. """ - - new_state = "debug_state_change" - - req = requests.post( - _url(remote.URL_API_STATES_ENTITY.format( - "test_entity.that_does_not_exist")), - data=json.dumps({"state": new_state, - "api_password": API_PASSWORD})) - - cur_state = (self.hass.states. - get("test_entity.that_does_not_exist").state) - - self.assertEqual(201, req.status_code) - self.assertEqual(cur_state, new_state) - - # pylint: disable=invalid-name - def test_api_fire_event_with_no_data(self): - """ Test if the API allows us to fire an event. """ - test_value = [] - - def listener(event): # pylint: disable=unused-argument - """ Helper method that will verify our event got called. """ - test_value.append(1) - - self.hass.listen_once_event("test.event_no_data", listener) - - requests.post( - _url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")), - headers=HA_HEADERS) - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) - - # pylint: disable=invalid-name - def test_api_fire_event_with_data(self): - """ Test if the API allows us to fire an event. """ - test_value = [] - - def listener(event): # pylint: disable=unused-argument - """ Helper method that will verify that our event got called and - that test if our data came through. """ - if "test" in event.data: - test_value.append(1) - - self.hass.listen_once_event("test_event_with_data", listener) - - requests.post( - _url(remote.URL_API_EVENTS_EVENT.format("test_event_with_data")), - data=json.dumps({"test": 1}), - headers=HA_HEADERS) - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) - - # pylint: disable=invalid-name - def test_api_fire_event_with_invalid_json(self): - """ Test if the API allows us to fire an event. """ - test_value = [] - - def listener(event): # pylint: disable=unused-argument - """ Helper method that will verify our event got called. """ - test_value.append(1) - - self.hass.listen_once_event("test_event_bad_data", listener) - - req = requests.post( - _url(remote.URL_API_EVENTS_EVENT.format("test_event_bad_data")), - data=json.dumps('not an object'), - headers=HA_HEADERS) - - self.hass._pool.block_till_done() - - self.assertEqual(422, req.status_code) - self.assertEqual(0, len(test_value)) - - def test_api_get_event_listeners(self): - """ Test if we can get the list of events being listened for. """ - req = requests.get(_url(remote.URL_API_EVENTS), - headers=HA_HEADERS) - - local = self.hass.bus.listeners - - for event in req.json(): - self.assertEqual(event["listener_count"], - local.pop(event["event"])) - - self.assertEqual(0, len(local)) - - def test_api_get_services(self): - """ Test if we can get a dict describing current services. """ - req = requests.get(_url(remote.URL_API_SERVICES), - headers=HA_HEADERS) - - local_services = self.hass.services.services - - for serv_domain in req.json(): - local = local_services.pop(serv_domain["domain"]) - - self.assertEqual(local, serv_domain["services"]) - - def test_api_call_service_no_data(self): - """ Test if the API allows us to call a service. """ - test_value = [] - - def listener(service_call): # pylint: disable=unused-argument - """ Helper method that will verify that our service got called. """ - test_value.append(1) - - self.hass.services.register("test_domain", "test_service", listener) - - requests.post( - _url(remote.URL_API_SERVICES_SERVICE.format( - "test_domain", "test_service")), - headers=HA_HEADERS) - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) - - def test_api_call_service_with_data(self): - """ Test if the API allows us to call a service. """ - test_value = [] - - def listener(service_call): # pylint: disable=unused-argument - """ Helper method that will verify that our service got called and - that test if our data came through. """ - if "test" in service_call.data: - test_value.append(1) - - self.hass.services.register("test_domain", "test_service", listener) - - requests.post( - _url(remote.URL_API_SERVICES_SERVICE.format( - "test_domain", "test_service")), - data=json.dumps({"test": 1}), - headers=HA_HEADERS) - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) - - -class TestRemoteMethods(unittest.TestCase): - """ Test the homeassistant.remote module. """ - - @classmethod - def setUpClass(cls): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - cls.hass = ensure_homeassistant_started() - - cls.api = remote.API("127.0.0.1", API_PASSWORD) - - def test_validate_api(self): - """ Test Python API validate_api. """ - self.assertEqual(remote.APIStatus.OK, remote.validate_api(self.api)) - - self.assertEqual(remote.APIStatus.INVALID_PASSWORD, - remote.validate_api( - remote.API("127.0.0.1", API_PASSWORD + "A"))) - - def test_get_event_listeners(self): - """ Test Python API get_event_listeners. """ - local_data = self.hass.bus.listeners - remote_data = remote.get_event_listeners(self.api) - - for event in remote_data: - self.assertEqual(local_data.pop(event["event"]), - event["listener_count"]) - - self.assertEqual(len(local_data), 0) - - def test_fire_event(self): - """ Test Python API fire_event. """ - test_value = [] - - def listener(event): # pylint: disable=unused-argument - """ Helper method that will verify our event got called. """ - test_value.append(1) - - self.hass.listen_once_event("test.event_no_data", listener) - - remote.fire_event(self.api, "test.event_no_data") - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) - - def test_get_state(self): - """ Test Python API get_state. """ - - self.assertEqual( - self.hass.states.get('test.test'), - remote.get_state(self.api, 'test.test')) - - def test_get_states(self): - """ Test Python API get_state_entity_ids. """ - - self.assertEqual( - remote.get_states(self.api), self.hass.states.all()) - - def test_set_state(self): - """ Test Python API set_state. """ - self.assertTrue(remote.set_state(self.api, 'test.test', 'set_test')) - - self.assertEqual('set_test', self.hass.states.get('test.test').state) - - def test_is_state(self): - """ Test Python API is_state. """ - - self.assertTrue( - remote.is_state(self.api, 'test.test', - self.hass.states.get('test.test').state)) - - def test_get_services(self): - """ Test Python API get_services. """ - - local_services = self.hass.services.services - - for serv_domain in remote.get_services(self.api): - local = local_services.pop(serv_domain["domain"]) - - self.assertEqual(local, serv_domain["services"]) - - def test_call_service(self): - """ Test Python API call_service. """ - test_value = [] - - def listener(service_call): # pylint: disable=unused-argument - """ Helper method that will verify that our service got called. """ - test_value.append(1) - - self.hass.services.register("test_domain", "test_service", listener) - - remote.call_service(self.api, "test_domain", "test_service") - - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) - - -class TestRemoteClasses(unittest.TestCase): - """ Test the homeassistant.remote module. """ - - @classmethod - def setUpClass(cls): # pylint: disable=invalid-name - """ things to be run when tests are started. """ - cls.hass = ensure_homeassistant_started() - cls.slave = ensure_slave_started() - - def test_home_assistant_init(self): - """ Test HomeAssistant init. """ - self.assertRaises( - ha.HomeAssistantError, remote.HomeAssistant, - remote.API('127.0.0.1', API_PASSWORD + 'A', 8124)) - - def test_statemachine_init(self): - """ Tests if remote.StateMachine copies all states on init. """ - self.assertEqual(len(self.hass.states.all()), - len(self.slave.states.all())) - - for state in self.hass.states.all(): - self.assertEqual( - state, self.slave.states.get(state.entity_id)) - - def test_statemachine_set(self): - """ Tests if setting the state on a slave is recorded. """ - self.slave.states.set("remote.test", "remote.statemachine test") - - # Wait till slave tells master - self.slave._pool.block_till_done() - # Wait till master gives updated state - self.hass._pool.block_till_done() - - self.assertEqual("remote.statemachine test", - self.slave.states.get("remote.test").state) - - def test_eventbus_fire(self): - """ Test if events fired from the eventbus get fired. """ - test_value = [] - - def listener(event): # pylint: disable=unused-argument - """ Helper method that will verify our event got called. """ - test_value.append(1) - - self.slave.listen_once_event("test.event_no_data", listener) - - self.slave.bus.fire("test.event_no_data") - - # Wait till slave tells master - self.slave._pool.block_till_done() - # Wait till master gives updated event - self.hass._pool.block_till_done() - - self.assertEqual(1, len(test_value)) diff --git a/homeassistant/util.py b/homeassistant/util.py index 5f70e76947a..69cfd52b3cd 100644 --- a/homeassistant/util.py +++ b/homeassistant/util.py @@ -226,32 +226,58 @@ class ThreadPool(object): """ work_queue = self.work_queue = queue.PriorityQueue() current_jobs = self.current_jobs = [] + self.worker_count = worker_count self.busy_callback = busy_callback self.busy_warning_limit = worker_count**2 + self._lock = threading.RLock() + self._quit_task = object() for _ in range(worker_count): worker = threading.Thread(target=_threadpool_worker, args=(work_queue, current_jobs, - job_handler)) + job_handler, self._quit_task)) worker.daemon = True worker.start() + self.running = True + def add_job(self, priority, job): """ Add a job to be sent to the workers. """ - self.work_queue.put(PriorityQueueItem(priority, job)) + with self._lock: + if not self.running: + raise Exception("We are shutting down the ") - # check if our queue is getting too big - if self.work_queue.qsize() > self.busy_warning_limit \ - and self.busy_callback is not None: + self.work_queue.put(PriorityQueueItem(priority, job)) - # Increase limit we will issue next warning - self.busy_warning_limit *= 2 + # check if our queue is getting too big + if self.work_queue.qsize() > self.busy_warning_limit \ + and self.busy_callback is not None: - self.busy_callback(self.current_jobs, self.work_queue.qsize()) + # Increase limit we will issue next warning + self.busy_warning_limit *= 2 + + self.busy_callback(self.current_jobs, self.work_queue.qsize()) def block_till_done(self): """ Blocks till all work is done. """ - self.work_queue.join() + with self._lock: + self.work_queue.join() + + def stop(self): + """ Stops all the threads. """ + with self._lock: + # Clear the queue + while self.work_queue.qsize() > 0: + self.work_queue.get() + self.work_queue.task_done() + + # Tell the workers to quit + for i in range(self.worker_count): + self.add_job(1000, self._quit_task) + + self.running = False + + self.block_till_done() class PriorityQueueItem(object): @@ -266,12 +292,16 @@ class PriorityQueueItem(object): return self.priority < other.priority -def _threadpool_worker(work_queue, current_jobs, job_handler): +def _threadpool_worker(work_queue, current_jobs, job_handler, quit_task): """ Provides the base functionality of a worker for the thread pool. """ while True: # Get new item from work_queue job = work_queue.get().item + if job == quit_task: + work_queue.task_done() + return + # Add to current running jobs job_log = (datetime.datetime.now(), job) current_jobs.append(job_log) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000000..0c08ea7c0e7 --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,302 @@ +""" +homeassistant.test +~~~~~~~~~~~~~~~~~~ + +Provides tests to verify that Home Assistant modules do what they should do. + +""" +# pylint: disable=protected-access,too-many-public-methods +import os +import unittest +import time +import threading +from datetime import datetime + +import homeassistant as ha + + +class TestHomeAssistant(unittest.TestCase): + """ + Tests the Home Assistant core classes. + Currently only includes tests to test cases that do not + get tested in the API integration tests. + """ + + def setUp(self): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + self.hass = ha.HomeAssistant() + self.hass.states.set("light.Bowl", "on") + self.hass.states.set("switch.AC", "off") + + def test_get_config_path(self): + """ Test get_config_path method. """ + self.assertEqual(os.getcwd(), self.hass.config_dir) + + self.assertEqual(os.path.join(os.getcwd(), "test.conf"), + self.hass.get_config_path("test.conf")) + + def test_block_till_stoped(self): + """ Test if we can block till stop service is called. """ + blocking_thread = threading.Thread(target=self.hass.block_till_stopped) + + self.assertFalse(blocking_thread.is_alive()) + + blocking_thread.start() + # Python will now give attention to the other thread + time.sleep(.01) + + self.assertTrue(blocking_thread.is_alive()) + + self.hass.call_service(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP) + self.hass._pool.block_till_done() + + # hass.block_till_stopped checks every second if it should quit + # we have to wait worst case 1 second + wait_loops = 0 + while blocking_thread.is_alive() and wait_loops < 10: + wait_loops += 1 + time.sleep(0.1) + + self.assertFalse(blocking_thread.is_alive()) + + def test_get_entity_ids(self): + """ Test get_entity_ids method. """ + ent_ids = self.hass.get_entity_ids() + self.assertEqual(2, len(ent_ids)) + self.assertTrue('light.Bowl' in ent_ids) + self.assertTrue('switch.AC' in ent_ids) + + ent_ids = self.hass.get_entity_ids('light') + self.assertEqual(1, len(ent_ids)) + self.assertTrue('light.Bowl' in ent_ids) + + def test_track_state_change(self): + """ Test track_state_change. """ + # 2 lists to track how often our callbacks got called + specific_runs = [] + wildcard_runs = [] + + self.hass.track_state_change( + 'light.Bowl', lambda a, b, c: specific_runs.append(1), 'on', 'off') + + self.hass.track_state_change( + 'light.Bowl', lambda a, b, c: wildcard_runs.append(1), + ha.MATCH_ALL, ha.MATCH_ALL) + + # Set same state should not trigger a state change/listener + self.hass.states.set('light.Bowl', 'on') + self.hass._pool.block_till_done() + self.assertEqual(0, len(specific_runs)) + self.assertEqual(0, len(wildcard_runs)) + + # State change off -> on + self.hass.states.set('light.Bowl', 'off') + self.hass._pool.block_till_done() + self.assertEqual(1, len(specific_runs)) + self.assertEqual(1, len(wildcard_runs)) + + # State change off -> off + self.hass.states.set('light.Bowl', 'off', {"some_attr": 1}) + self.hass._pool.block_till_done() + self.assertEqual(1, len(specific_runs)) + self.assertEqual(2, len(wildcard_runs)) + + # State change off -> on + self.hass.states.set('light.Bowl', 'on') + self.hass._pool.block_till_done() + self.assertEqual(1, len(specific_runs)) + self.assertEqual(3, len(wildcard_runs)) + + def test_listen_once_event(self): + """ Test listen_once_event method. """ + runs = [] + + self.hass.listen_once_event('test_event', lambda x: runs.append(1)) + + self.hass.bus.fire('test_event') + self.hass._pool.block_till_done() + self.assertEqual(1, len(runs)) + + # Second time it should not increase runs + self.hass.bus.fire('test_event') + self.hass._pool.block_till_done() + self.assertEqual(1, len(runs)) + + def test_track_point_in_time(self): + """ Test track point in time. """ + before_birthday = datetime(1985, 7, 9, 12, 0, 0) + birthday_paulus = datetime(1986, 7, 9, 12, 0, 0) + after_birthday = datetime(1987, 7, 9, 12, 0, 0) + + runs = [] + + self.hass.track_point_in_time( + lambda x: runs.append(1), birthday_paulus) + + self._send_time_changed(before_birthday) + self.hass._pool.block_till_done() + self.assertEqual(0, len(runs)) + + self._send_time_changed(birthday_paulus) + self.hass._pool.block_till_done() + self.assertEqual(1, len(runs)) + + # A point in time tracker will only fire once, this should do nothing + self._send_time_changed(birthday_paulus) + self.hass._pool.block_till_done() + self.assertEqual(1, len(runs)) + + self.hass.track_point_in_time( + lambda x: runs.append(1), birthday_paulus) + + self._send_time_changed(after_birthday) + self.hass._pool.block_till_done() + self.assertEqual(2, len(runs)) + + def test_track_time_change(self): + """ Test tracking time change. """ + wildcard_runs = [] + specific_runs = [] + + self.hass.track_time_change(lambda x: wildcard_runs.append(1)) + self.hass.track_time_change( + lambda x: specific_runs.append(1), second=[0, 30]) + + self._send_time_changed(datetime(2014, 5, 24, 12, 0, 0)) + self.hass._pool.block_till_done() + self.assertEqual(1, len(specific_runs)) + self.assertEqual(1, len(wildcard_runs)) + + self._send_time_changed(datetime(2014, 5, 24, 12, 0, 15)) + self.hass._pool.block_till_done() + self.assertEqual(1, len(specific_runs)) + self.assertEqual(2, len(wildcard_runs)) + + self._send_time_changed(datetime(2014, 5, 24, 12, 0, 30)) + self.hass._pool.block_till_done() + self.assertEqual(2, len(specific_runs)) + self.assertEqual(3, len(wildcard_runs)) + + def _send_time_changed(self, now): + """ Send a time changed event. """ + self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now}) + + +class TestEvent(unittest.TestCase): + """ Test Event class. """ + def test_repr(self): + """ Test that repr method works. #MoreCoverage """ + self.assertEqual( + "", + str(ha.Event("TestEvent"))) + + self.assertEqual( + "", + str(ha.Event("TestEvent", + {"beer": "nice"}, + ha.EventOrigin.remote))) + + +class TestEventBus(unittest.TestCase): + """ Test EventBus methods. """ + + def setUp(self): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + self.bus = ha.EventBus() + self.bus.listen('test_event', lambda x: len) + + def test_add_remove_listener(self): + """ Test remove_listener method. """ + old_count = len(self.bus.listeners) + + listener = lambda x: len + + self.bus.listen('test', listener) + + self.assertEqual(old_count + 1, len(self.bus.listeners)) + + # Try deleting a non registered listener, nothing should happen + self.bus.remove_listener('test', lambda x: len) + + # Remove listener + self.bus.remove_listener('test', listener) + self.assertEqual(old_count, len(self.bus.listeners)) + + # Try deleting listener while category doesn't exist either + self.bus.remove_listener('test', listener) + + +class TestState(unittest.TestCase): + """ Test EventBus methods. """ + + def test_init(self): + """ Test state.init """ + self.assertRaises( + ha.InvalidEntityFormatError, ha.State, + 'invalid_entity_format', 'test_state') + + def test_repr(self): + """ Test state.repr """ + self.assertEqual("", + str(ha.State( + "happy.happy", "on", + last_changed=datetime(1984, 12, 8, 12, 0, 0)))) + + self.assertEqual("", + str(ha.State("happy.happy", "on", {"brightness": 144}, + datetime(1984, 12, 8, 12, 0, 0)))) + + +class TestStateMachine(unittest.TestCase): + """ Test EventBus methods. """ + + def setUp(self): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + self.bus = ha.EventBus() + self.states = ha.StateMachine(self.bus) + self.states.set("light.Bowl", "on") + self.states.set("switch.AC", "off") + + def test_is_state(self): + """ Test is_state method. """ + self.assertTrue(self.states.is_state('light.Bowl', 'on')) + self.assertFalse(self.states.is_state('light.Bowl', 'off')) + self.assertFalse(self.states.is_state('light.Non_existing', 'on')) + + def test_remove(self): + """ Test remove method. """ + self.assertTrue('light.Bowl' in self.states.entity_ids) + self.assertTrue(self.states.remove('light.Bowl')) + self.assertFalse('light.Bowl' in self.states.entity_ids) + + # If it does not exist, we should get False + self.assertFalse(self.states.remove('light.Bowl')) + + +class TestServiceCall(unittest.TestCase): + """ Test ServiceCall class. """ + def test_repr(self): + """ Test repr method. """ + self.assertEqual( + "", + str(ha.ServiceCall('homeassistant', 'start'))) + + self.assertEqual( + "", + str(ha.ServiceCall('homeassistant', 'start', {"fast": "yes"}))) + + +class TestServiceRegistry(unittest.TestCase): + """ Test EventBus methods. """ + + def setUp(self): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + pool = ha.create_worker_pool() + self.bus = ha.EventBus(pool) + self.services = ha.ServiceRegistry(self.bus, pool) + self.services.register("test_domain", "test_service", lambda x: len) + + def test_has_service(self): + """ Test has_service method. """ + self.assertTrue( + self.services.has_service("test_domain", "test_service")) diff --git a/test/components/__init__.py b/test/components/__init__.py new file mode 100644 index 00000000000..32a6d5d32e6 --- /dev/null +++ b/test/components/__init__.py @@ -0,0 +1,70 @@ +""" +homeassistant.test +~~~~~~~~~~~~~~~~~~ + +Provides tests to verify that Home Assistant modules do what they should do. + +""" +# pylint: disable=protected-access,too-many-public-methods +import unittest + +import homeassistant as ha +import homeassistant.loader as loader +import homeassistant.components as comps + + +class TestComponentsCore(unittest.TestCase): + """ Tests homeassistant.components module. """ + + def setUp(self): # pylint: disable=invalid-name + """ Init needed objects. """ + self.hass = ha.HomeAssistant() + loader.prepare(self.hass) + self.assertTrue(comps.setup(self.hass, {})) + + self.hass.states.set('light.Bowl', comps.STATE_ON) + self.hass.states.set('light.Ceiling', comps.STATE_OFF) + + def test_is_on(self): + """ Test is_on method. """ + self.assertTrue(comps.is_on(self.hass, 'light.Bowl')) + self.assertFalse(comps.is_on(self.hass, 'light.Ceiling')) + self.assertTrue(comps.is_on(self.hass)) + + def test_turn_on(self): + """ Test turn_on method. """ + runs = [] + self.hass.services.register( + 'light', comps.SERVICE_TURN_ON, lambda x: runs.append(1)) + + comps.turn_on(self.hass, 'light.Ceiling') + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(runs)) + + def test_turn_off(self): + """ Test turn_off method. """ + runs = [] + self.hass.services.register( + 'light', comps.SERVICE_TURN_OFF, lambda x: runs.append(1)) + + comps.turn_off(self.hass, 'light.Bowl') + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(runs)) + + def test_extract_entity_ids(self): + """ Test extract_entity_ids method. """ + call = ha.ServiceCall('light', 'turn_on', + {comps.ATTR_ENTITY_ID: 'light.Bowl'}) + + self.assertEqual(['light.Bowl'], + comps.extract_entity_ids(self.hass, call)) + + call = ha.ServiceCall('light', 'turn_on', + {comps.ATTR_ENTITY_ID: ['light.Bowl']}) + + self.assertEqual(['light.Bowl'], + comps.extract_entity_ids(self.hass, call)) diff --git a/test/components/http.py b/test/components/http.py new file mode 100644 index 00000000000..22144704a52 --- /dev/null +++ b/test/components/http.py @@ -0,0 +1,252 @@ +""" +homeassistant.test +~~~~~~~~~~~~~~~~~~ + +Provides tests to verify that Home Assistant modules do what they should do. + +""" +# pylint: disable=protected-access,too-many-public-methods +import re +import unittest +import json + +import requests + +import homeassistant as ha +import homeassistant.remote as remote +import homeassistant.components.http as http + +from test.remote import _url, ensure_homeassistant_started + +API_PASSWORD = "test1234" + +HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD} + + +class TestHTTP(unittest.TestCase): + """ Test the HTTP debug interface and API. """ + + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + cls.hass = ensure_homeassistant_started() + + def test_get_frontend(self): + """ Tests if we can get the frontend. """ + req = requests.get(_url("")) + + self.assertEqual(200, req.status_code) + + frontendjs = re.search( + r'(?P\/static\/frontend-[A-Za-z0-9]{32}.html)', + req.text).groups(0)[0] + + self.assertIsNotNone(frontendjs) + + req = requests.get(_url(frontendjs)) + + self.assertEqual(200, req.status_code) + + def test_api_password(self): + """ Test if we get access denied if we omit or provide + a wrong api password. """ + req = requests.get( + _url(remote.URL_API_STATES_ENTITY.format("test"))) + + self.assertEqual(401, req.status_code) + + req = requests.get( + _url(remote.URL_API_STATES_ENTITY.format("test")), + headers={remote.AUTH_HEADER: 'wrongpassword'}) + + self.assertEqual(401, req.status_code) + + def test_api_list_state_entities(self): + """ Test if the debug interface allows us to list state entities. """ + req = requests.get(_url(remote.URL_API_STATES), + headers=HA_HEADERS) + + remote_data = [ha.State.from_dict(item) for item in req.json()] + + self.assertEqual(self.hass.states.all(), remote_data) + + def test_api_get_state(self): + """ Test if the debug interface allows us to get a state. """ + req = requests.get( + _url(remote.URL_API_STATES_ENTITY.format("test.test")), + headers=HA_HEADERS) + + data = ha.State.from_dict(req.json()) + + state = self.hass.states.get("test.test") + + self.assertEqual(state.state, data.state) + self.assertEqual(state.last_changed, data.last_changed) + self.assertEqual(state.attributes, data.attributes) + + def test_api_get_non_existing_state(self): + """ Test if the debug interface allows us to get a state. """ + req = requests.get( + _url(remote.URL_API_STATES_ENTITY.format("does_not_exist")), + headers=HA_HEADERS) + + self.assertEqual(404, req.status_code) + + def test_api_state_change(self): + """ Test if we can change the state of an entity that exists. """ + + self.hass.states.set("test.test", "not_to_be_set") + + requests.post(_url(remote.URL_API_STATES_ENTITY.format("test.test")), + data=json.dumps({"state": "debug_state_change2", + "api_password": API_PASSWORD})) + + self.assertEqual("debug_state_change2", + self.hass.states.get("test.test").state) + + # pylint: disable=invalid-name + def test_api_state_change_of_non_existing_entity(self): + """ Test if the API allows us to change a state of + a non existing entity. """ + + new_state = "debug_state_change" + + req = requests.post( + _url(remote.URL_API_STATES_ENTITY.format( + "test_entity.that_does_not_exist")), + data=json.dumps({"state": new_state, + "api_password": API_PASSWORD})) + + cur_state = (self.hass.states. + get("test_entity.that_does_not_exist").state) + + self.assertEqual(201, req.status_code) + self.assertEqual(cur_state, new_state) + + # pylint: disable=invalid-name + def test_api_fire_event_with_no_data(self): + """ Test if the API allows us to fire an event. """ + test_value = [] + + def listener(event): # pylint: disable=unused-argument + """ Helper method that will verify our event got called. """ + test_value.append(1) + + self.hass.listen_once_event("test.event_no_data", listener) + + requests.post( + _url(remote.URL_API_EVENTS_EVENT.format("test.event_no_data")), + headers=HA_HEADERS) + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) + + # pylint: disable=invalid-name + def test_api_fire_event_with_data(self): + """ Test if the API allows us to fire an event. """ + test_value = [] + + def listener(event): # pylint: disable=unused-argument + """ Helper method that will verify that our event got called and + that test if our data came through. """ + if "test" in event.data: + test_value.append(1) + + self.hass.listen_once_event("test_event_with_data", listener) + + requests.post( + _url(remote.URL_API_EVENTS_EVENT.format("test_event_with_data")), + data=json.dumps({"test": 1}), + headers=HA_HEADERS) + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) + + # pylint: disable=invalid-name + def test_api_fire_event_with_invalid_json(self): + """ Test if the API allows us to fire an event. """ + test_value = [] + + def listener(event): # pylint: disable=unused-argument + """ Helper method that will verify our event got called. """ + test_value.append(1) + + self.hass.listen_once_event("test_event_bad_data", listener) + + req = requests.post( + _url(remote.URL_API_EVENTS_EVENT.format("test_event_bad_data")), + data=json.dumps('not an object'), + headers=HA_HEADERS) + + self.hass._pool.block_till_done() + + self.assertEqual(422, req.status_code) + self.assertEqual(0, len(test_value)) + + def test_api_get_event_listeners(self): + """ Test if we can get the list of events being listened for. """ + req = requests.get(_url(remote.URL_API_EVENTS), + headers=HA_HEADERS) + + local = self.hass.bus.listeners + + for event in req.json(): + self.assertEqual(event["listener_count"], + local.pop(event["event"])) + + self.assertEqual(0, len(local)) + + def test_api_get_services(self): + """ Test if we can get a dict describing current services. """ + req = requests.get(_url(remote.URL_API_SERVICES), + headers=HA_HEADERS) + + local_services = self.hass.services.services + + for serv_domain in req.json(): + local = local_services.pop(serv_domain["domain"]) + + self.assertEqual(local, serv_domain["services"]) + + def test_api_call_service_no_data(self): + """ Test if the API allows us to call a service. """ + test_value = [] + + def listener(service_call): # pylint: disable=unused-argument + """ Helper method that will verify that our service got called. """ + test_value.append(1) + + self.hass.services.register("test_domain", "test_service", listener) + + requests.post( + _url(remote.URL_API_SERVICES_SERVICE.format( + "test_domain", "test_service")), + headers=HA_HEADERS) + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) + + def test_api_call_service_with_data(self): + """ Test if the API allows us to call a service. """ + test_value = [] + + def listener(service_call): # pylint: disable=unused-argument + """ Helper method that will verify that our service got called and + that test if our data came through. """ + if "test" in service_call.data: + test_value.append(1) + + self.hass.services.register("test_domain", "test_service", listener) + + requests.post( + _url(remote.URL_API_SERVICES_SERVICE.format( + "test_domain", "test_service")), + data=json.dumps({"test": 1}), + headers=HA_HEADERS) + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) diff --git a/test/loader.py b/test/loader.py new file mode 100644 index 00000000000..5f13e5f87ad --- /dev/null +++ b/test/loader.py @@ -0,0 +1,24 @@ +""" +homeassistant.test +~~~~~~~~~~~~~~~~~~ + +Provides tests to verify that Home Assistant modules do what they should do. + +""" +# pylint: disable=too-many-public-methods +import unittest + +import homeassistant as ha +import homeassistant.loader as loader +import homeassistant.components.http as http + + +class TestLoader(unittest.TestCase): + """ Test the loader module. """ + def setUp(self): # pylint: disable=invalid-name + self.hass = ha.HomeAssistant() + loader.prepare(self.hass) + + def test_get_component(self): + """ Test if get_component works. """ + self.assertEqual(http, loader.get_component('http')) diff --git a/test/remote.py b/test/remote.py new file mode 100644 index 00000000000..c39adb20bac --- /dev/null +++ b/test/remote.py @@ -0,0 +1,225 @@ +""" +homeassistant.test +~~~~~~~~~~~~~~~~~~ + +Provides tests to verify that Home Assistant modules do what they should do. + +""" +# pylint: disable=protected-access,too-many-public-methods +import unittest + +import homeassistant as ha +import homeassistant.remote as remote +import homeassistant.components.http as http + +API_PASSWORD = "test1234" + +HTTP_BASE_URL = "http://127.0.0.1:{}".format(remote.SERVER_PORT) + +HA_HEADERS = {remote.AUTH_HEADER: API_PASSWORD} + + +def _url(path=""): + """ Helper method to generate urls. """ + return HTTP_BASE_URL + path + + +class HAHelper(object): # pylint: disable=too-few-public-methods + """ Helper class to keep track of current running HA instance. """ + hass = None + slave = None + + +def ensure_homeassistant_started(): + """ Ensures home assistant is started. """ + + if not HAHelper.hass: + print("Setting up new HA") + hass = ha.HomeAssistant() + + hass.bus.listen('test_event', lambda _: _) + hass.states.set('test.test', 'a_state') + + http.setup(hass, + {http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD}}) + + hass.start() + + HAHelper.hass = hass + + return HAHelper.hass + + +def ensure_slave_started(): + """ Ensure a home assistant slave is started. """ + + ensure_homeassistant_started() + + if not HAHelper.slave: + local_api = remote.API("127.0.0.1", API_PASSWORD, 8124) + remote_api = remote.API("127.0.0.1", API_PASSWORD) + slave = remote.HomeAssistant(remote_api, local_api) + + http.setup(slave, + {http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD, + http.CONF_SERVER_PORT: 8124}}) + + slave.start() + + HAHelper.slave = slave + + return HAHelper.slave + + +class TestRemoteMethods(unittest.TestCase): + """ Test the homeassistant.remote module. """ + + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + cls.hass = ensure_homeassistant_started() + + cls.api = remote.API("127.0.0.1", API_PASSWORD) + + def test_validate_api(self): + """ Test Python API validate_api. """ + self.assertEqual(remote.APIStatus.OK, remote.validate_api(self.api)) + + self.assertEqual(remote.APIStatus.INVALID_PASSWORD, + remote.validate_api( + remote.API("127.0.0.1", API_PASSWORD + "A"))) + + def test_get_event_listeners(self): + """ Test Python API get_event_listeners. """ + local_data = self.hass.bus.listeners + remote_data = remote.get_event_listeners(self.api) + + for event in remote_data: + self.assertEqual(local_data.pop(event["event"]), + event["listener_count"]) + + self.assertEqual(len(local_data), 0) + + def test_fire_event(self): + """ Test Python API fire_event. """ + test_value = [] + + def listener(event): # pylint: disable=unused-argument + """ Helper method that will verify our event got called. """ + test_value.append(1) + + self.hass.listen_once_event("test.event_no_data", listener) + + remote.fire_event(self.api, "test.event_no_data") + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) + + def test_get_state(self): + """ Test Python API get_state. """ + + self.assertEqual( + self.hass.states.get('test.test'), + remote.get_state(self.api, 'test.test')) + + def test_get_states(self): + """ Test Python API get_state_entity_ids. """ + + self.assertEqual( + remote.get_states(self.api), self.hass.states.all()) + + def test_set_state(self): + """ Test Python API set_state. """ + self.assertTrue(remote.set_state(self.api, 'test.test', 'set_test')) + + self.assertEqual('set_test', self.hass.states.get('test.test').state) + + def test_is_state(self): + """ Test Python API is_state. """ + + self.assertTrue( + remote.is_state(self.api, 'test.test', + self.hass.states.get('test.test').state)) + + def test_get_services(self): + """ Test Python API get_services. """ + + local_services = self.hass.services.services + + for serv_domain in remote.get_services(self.api): + local = local_services.pop(serv_domain["domain"]) + + self.assertEqual(local, serv_domain["services"]) + + def test_call_service(self): + """ Test Python API call_service. """ + test_value = [] + + def listener(service_call): # pylint: disable=unused-argument + """ Helper method that will verify that our service got called. """ + test_value.append(1) + + self.hass.services.register("test_domain", "test_service", listener) + + remote.call_service(self.api, "test_domain", "test_service") + + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) + + +class TestRemoteClasses(unittest.TestCase): + """ Test the homeassistant.remote module. """ + + @classmethod + def setUpClass(cls): # pylint: disable=invalid-name + """ things to be run when tests are started. """ + cls.hass = ensure_homeassistant_started() + cls.slave = ensure_slave_started() + + def test_home_assistant_init(self): + """ Test HomeAssistant init. """ + self.assertRaises( + ha.HomeAssistantError, remote.HomeAssistant, + remote.API('127.0.0.1', API_PASSWORD + 'A', 8124)) + + def test_statemachine_init(self): + """ Tests if remote.StateMachine copies all states on init. """ + self.assertEqual(len(self.hass.states.all()), + len(self.slave.states.all())) + + for state in self.hass.states.all(): + self.assertEqual( + state, self.slave.states.get(state.entity_id)) + + def test_statemachine_set(self): + """ Tests if setting the state on a slave is recorded. """ + self.slave.states.set("remote.test", "remote.statemachine test") + + # Wait till slave tells master + self.slave._pool.block_till_done() + # Wait till master gives updated state + self.hass._pool.block_till_done() + + self.assertEqual("remote.statemachine test", + self.slave.states.get("remote.test").state) + + def test_eventbus_fire(self): + """ Test if events fired from the eventbus get fired. """ + test_value = [] + + def listener(event): # pylint: disable=unused-argument + """ Helper method that will verify our event got called. """ + test_value.append(1) + + self.slave.listen_once_event("test.event_no_data", listener) + + self.slave.bus.fire("test.event_no_data") + + # Wait till slave tells master + self.slave._pool.block_till_done() + # Wait till master gives updated event + self.hass._pool.block_till_done() + + self.assertEqual(1, len(test_value)) diff --git a/test/util.py b/test/util.py new file mode 100644 index 00000000000..8c1c02e5c39 --- /dev/null +++ b/test/util.py @@ -0,0 +1,90 @@ +""" +homeassistant.test +~~~~~~~~~~~~~~~~~~ + +Provides tests to verify that Home Assistant modules do what they should do. + +""" +# pylint: disable=too-many-public-methods +import unittest +from datetime import datetime + +import homeassistant.util as util + + +class TestUtil(unittest.TestCase): + """ Tests util methods. """ + def test_sanitize_filename(self): + """ Test sanitize_filename. """ + self.assertEqual("test", util.sanitize_filename("test")) + self.assertEqual("test", util.sanitize_filename("/test")) + self.assertEqual("test", util.sanitize_filename("..test")) + self.assertEqual("test", util.sanitize_filename("\\test")) + self.assertEqual("test", util.sanitize_filename("\\../test")) + + def test_sanitize_path(self): + """ Test sanitize_path. """ + self.assertEqual("test/path", util.sanitize_path("test/path")) + self.assertEqual("test/path", util.sanitize_path("~test/path")) + self.assertEqual("//test/path", + util.sanitize_path("~/../test/path")) + + def test_slugify(self): + """ Test slugify. """ + self.assertEqual("Test", util.slugify("T-!@#$!#@$!$est")) + self.assertEqual("Test_More", util.slugify("Test More")) + self.assertEqual("Test_More", util.slugify("Test_(More)")) + + def test_datetime_to_str(self): + """ Test datetime_to_str. """ + self.assertEqual("12:00:00 09-07-1986", + util.datetime_to_str(datetime(1986, 7, 9, 12, 0, 0))) + + def test_str_to_datetime(self): + """ Test str_to_datetime. """ + self.assertEqual(datetime(1986, 7, 9, 12, 0, 0), + util.str_to_datetime("12:00:00 09-07-1986")) + + def test_split_entity_id(self): + """ Test split_entity_id. """ + self.assertEqual(['domain', 'object_id'], + util.split_entity_id('domain.object_id')) + + def test_repr_helper(self): + """ Test repr_helper. """ + self.assertEqual("A", util.repr_helper("A")) + self.assertEqual("5", util.repr_helper(5)) + self.assertEqual("True", util.repr_helper(True)) + self.assertEqual("test=1", + util.repr_helper({"test": 1})) + self.assertEqual("12:00:00 09-07-1986", + util.repr_helper(datetime(1986, 7, 9, 12, 0, 0))) + + # pylint: disable=invalid-name + def test_color_RGB_to_xy(self): + """ Test color_RGB_to_xy. """ + self.assertEqual((0, 0), util.color_RGB_to_xy(0, 0, 0)) + self.assertEqual((0.3127159072215825, 0.3290014805066623), + util.color_RGB_to_xy(255, 255, 255)) + + self.assertEqual((0.15001662234042554, 0.060006648936170214), + util.color_RGB_to_xy(0, 0, 255)) + + self.assertEqual((0.3, 0.6), util.color_RGB_to_xy(0, 255, 0)) + + self.assertEqual((0.6400744994567747, 0.3299705106316933), + util.color_RGB_to_xy(255, 0, 0)) + + def test_convert(self): + """ Test convert. """ + self.assertEqual(5, util.convert("5", int)) + self.assertEqual(5.0, util.convert("5", float)) + self.assertEqual(True, util.convert("True", bool)) + self.assertEqual(1, util.convert("NOT A NUMBER", int, 1)) + self.assertEqual(1, util.convert(None, int, 1)) + + def test_ensure_unique_string(self): + """ Test ensure_unique_string. """ + self.assertEqual( + "Beer_3", + util.ensure_unique_string("Beer", ["Beer", "Beer_2"]))