Code according to PEP8 standard

This commit is contained in:
Paulus Schoutsen 2013-11-10 16:46:48 -08:00
parent 9c9b00c2d4
commit 483546a31d
8 changed files with 294 additions and 269 deletions

View File

@ -19,7 +19,7 @@ EVENT_START = "start"
EVENT_STATE_CHANGED = "state_changed"
EVENT_TIME_CHANGED = "time_changed"
TIMER_INTERVAL = 10 # seconds
TIMER_INTERVAL = 10 # seconds
# We want to be able to fire every time a minute starts (seconds=0).
# We want this so other modules can use that to make sure they fire
@ -28,6 +28,7 @@ assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!"
DATE_STR_FORMAT = "%H:%M:%S %d-%m-%Y"
def start_home_assistant(eventbus):
""" Start home assistant. """
Timer(eventbus)
@ -41,6 +42,7 @@ def start_home_assistant(eventbus):
except KeyboardInterrupt:
break
def datetime_to_str(dattim):
""" Converts datetime to a string format.
@ -48,6 +50,7 @@ def datetime_to_str(dattim):
"""
return dattim.strftime(DATE_STR_FORMAT)
def str_to_datetime(dt_str):
""" Converts a string to a datetime object.
@ -55,6 +58,7 @@ def str_to_datetime(dt_str):
"""
return datetime.strptime(dt_str, DATE_STR_FORMAT)
def ensure_list(parameter):
""" Wraps parameter in a list if it is not one and returns it.
@ -62,6 +66,7 @@ def ensure_list(parameter):
"""
return parameter if isinstance(parameter, list) else [parameter]
def matcher(subject, pattern):
""" Returns True if subject matches the pattern.
@ -70,6 +75,7 @@ def matcher(subject, pattern):
"""
return '*' in pattern or subject in pattern
def create_state(state, attributes=None, last_changed=None):
""" Creates a new state and initializes defaults where necessary. """
attributes = attributes or {}
@ -79,6 +85,7 @@ def create_state(state, attributes=None, last_changed=None):
'attributes': attributes,
'last_changed': datetime_to_str(last_changed)}
def track_state_change(eventbus, category, from_state, to_state, action):
""" Helper method to track specific state changes. """
from_state = ensure_list(from_state)
@ -96,6 +103,7 @@ def track_state_change(eventbus, category, from_state, to_state, action):
eventbus.listen(EVENT_STATE_CHANGED, listener)
# pylint: disable=too-many-arguments
def track_time_change(eventbus, action,
year='*', month='*', day='*',
@ -111,12 +119,12 @@ def track_time_change(eventbus, action,
now = str_to_datetime(event.data['now'])
if (point_in_time and now > point_in_time) or \
(not point_in_time and \
matcher(now.year, year) and \
matcher(now.month, month) and \
matcher(now.day, day) and \
matcher(now.hour, hour) and \
matcher(now.minute, minute) and \
(not point_in_time and
matcher(now.year, year) and
matcher(now.month, month) and
matcher(now.day, day) and
matcher(now.hour, hour) and
matcher(now.minute, minute) and
matcher(now.second, second)):
# point_in_time are exact points in time
@ -128,8 +136,10 @@ def track_time_change(eventbus, action,
eventbus.listen(EVENT_TIME_CHANGED, listener)
Event = namedtuple("Event", ["eventbus", "event_type", "data"])
class EventBus(object):
""" Class that allows code to listen for- and fire events. """
@ -140,9 +150,9 @@ class EventBus(object):
@property
def listeners(self):
""" List of events that is being listened for. """
return { key: len(self._listeners[key])
for key in self._listeners.keys()
if len(self._listeners[key]) > 0 }
return {key: len(self._listeners[key])
for key in self._listeners.keys()
if len(self._listeners[key]) > 0}
def fire(self, event_type, event_data=None):
""" Fire an event. """
@ -151,7 +161,7 @@ class EventBus(object):
event_data = {}
self.logger.info("EventBus:Event {}: {}".format(
event_type, event_data))
event_type, event_data))
def run():
""" Fire listeners for event. """
@ -160,11 +170,11 @@ class EventBus(object):
# We do not use itertools.chain() because some listeners might
# choose to remove themselves as a listener while being executed
for listener in self._listeners[ALL_EVENTS] + \
self._listeners[event.event_type]:
self._listeners[event.event_type]:
try:
listener(event)
except Exception: #pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
self.logger.exception("EventBus:Exception in listener")
# We dont want the eventbus to be blocking - run in a thread.
@ -206,6 +216,7 @@ class EventBus(object):
except ValueError:
pass
class StateMachine(object):
""" Helper class that tracks the state of different categories. """
@ -237,14 +248,14 @@ class StateMachine(object):
old_state = self.states[category]
if old_state['state'] != new_state or \
old_state['attributes'] != attributes:
old_state['attributes'] != attributes:
self.states[category] = create_state(new_state, attributes)
self.eventbus.fire(EVENT_STATE_CHANGED,
{'category':category,
'old_state':old_state,
'new_state':self.states[category]})
{'category': category,
'old_state': old_state,
'new_state': self.states[category]})
self.lock.release()
@ -263,6 +274,7 @@ class StateMachine(object):
return cur_state and cur_state['state'] == state
class Timer(threading.Thread):
""" Timer will sent out an event every TIMER_INTERVAL seconds. """
@ -290,10 +302,10 @@ class Timer(threading.Thread):
now = datetime.now()
if now.second % TIMER_INTERVAL > 0 or \
now.second == last_fired_on_second:
now.second == last_fired_on_second:
slp_seconds = TIMER_INTERVAL - now.second % TIMER_INTERVAL + \
.5 - now.microsecond/1000000.0
.5 - now.microsecond/1000000.0
time.sleep(slp_seconds)
@ -302,9 +314,8 @@ class Timer(threading.Thread):
last_fired_on_second = now.second
self.eventbus.fire(EVENT_TIME_CHANGED,
{'now': datetime_to_str(now)})
{'now': datetime_to_str(now)})
class HomeAssistantException(Exception):
""" General Home Assistant exception occured. """

View File

@ -35,6 +35,7 @@ EVENT_KEYBOARD_MEDIA_PLAY_PAUSE = "keyboard.media_play_pause"
EVENT_KEYBOARD_MEDIA_NEXT_TRACK = "keyboard.media_next_track"
EVENT_KEYBOARD_MEDIA_PREV_TRACK = "keyboard.media_prev_track"
def _hue_process_transition_time(transition_seconds):
""" Transition time is in 1/10th seconds
and cannot exceed MAX_TRANSITION_TIME. """
@ -56,21 +57,21 @@ class LightTrigger(object):
self.logger = logging.getLogger(__name__)
# Track home coming of each seperate device
for category in device_tracker.device_state_categories():
for category in device_tracker.device_state_categories:
ha.track_state_change(eventbus, category,
DEVICE_STATE_NOT_HOME, DEVICE_STATE_HOME,
self._handle_device_state_change)
DEVICE_STATE_NOT_HOME, DEVICE_STATE_HOME,
self._handle_device_state_change)
# Track when all devices are gone to shut down lights
ha.track_state_change(eventbus, STATE_CATEGORY_ALL_DEVICES,
DEVICE_STATE_HOME, DEVICE_STATE_NOT_HOME,
self._handle_device_state_change)
DEVICE_STATE_HOME, DEVICE_STATE_NOT_HOME,
self._handle_device_state_change)
# Track every time sun rises so we can schedule a time-based
# pre-sun set event
ha.track_state_change(eventbus, STATE_CATEGORY_SUN,
SUN_STATE_BELOW_HORIZON, SUN_STATE_ABOVE_HORIZON,
self._handle_sun_rising)
SUN_STATE_BELOW_HORIZON, SUN_STATE_ABOVE_HORIZON,
self._handle_sun_rising)
# If the sun is already above horizon
# schedule the time-based pre-sun set event
@ -107,8 +108,8 @@ class LightTrigger(object):
for index, light_id in enumerate(self.light_control.light_ids):
ha.track_time_change(self.eventbus, turn_on(light_id),
point_in_time=start_point +
index * LIGHT_TRANSITION_TIME)
point_in_time=(start_point +
index * LIGHT_TRANSITION_TIME))
def _turn_light_on_before_sunset(self, light_id=None):
""" Helper function to turn on lights slowly if there
@ -125,11 +126,11 @@ class LightTrigger(object):
light_needed = (not lights_are_on and
self.statemachine.is_state(STATE_CATEGORY_SUN,
SUN_STATE_BELOW_HORIZON))
SUN_STATE_BELOW_HORIZON))
# Specific device came home ?
if (category != STATE_CATEGORY_ALL_DEVICES and
new_state['state'] == DEVICE_STATE_HOME):
new_state['state'] == DEVICE_STATE_HOME):
# These variables are needed for the elif check
now = datetime.now()
@ -140,7 +141,7 @@ class LightTrigger(object):
self.logger.info(
"Home coming event for {}. Turning lights on".
format(category))
format(category))
self.light_control.turn_light_on()
@ -162,7 +163,6 @@ class LightTrigger(object):
# will all the following then, break.
break
# Did all devices leave the house?
elif (category == STATE_CATEGORY_ALL_DEVICES and
new_state['state'] == DEVICE_STATE_NOT_HOME and lights_are_on):
@ -176,7 +176,7 @@ class LightTrigger(object):
state = self.statemachine.get_state(STATE_CATEGORY_SUN)
return ha.str_to_datetime(
state['attributes'][STATE_ATTRIBUTE_NEXT_SUN_SETTING])
state['attributes'][STATE_ATTRIBUTE_NEXT_SUN_SETTING])
def _time_for_light_before_sun_set(self):
""" Helper method to calculate the point in time we have to start
@ -185,7 +185,7 @@ class LightTrigger(object):
"""
return (self._next_sun_setting() -
LIGHT_TRANSITION_TIME * len(self.light_control.light_ids))
LIGHT_TRANSITION_TIME * len(self.light_control.light_ids))
class HueLightControl(object):
@ -195,8 +195,8 @@ class HueLightControl(object):
try:
import phue
except ImportError:
logging.getLogger(__name__).exception(("HueLightControl:"
"Error while importing dependency phue."))
logging.getLogger(__name__).exception(
"HueLightControl: Error while importing dependency phue.")
self.success_init = False
@ -208,7 +208,6 @@ class HueLightControl(object):
self.success_init = True
def is_light_on(self, light_id=None):
""" Returns if specified or all light are on. """
if not light_id:
@ -217,21 +216,19 @@ class HueLightControl(object):
else:
return self.bridge.get_light(light_id, 'on')
def turn_light_on(self, light_id=None, transition_seconds=None):
""" Turn the specified or all lights on. """
if not light_id:
light_id = self.light_ids
command = {'on': True, 'xy': [0.5119, 0.4147], 'bri':164}
command = {'on': True, 'xy': [0.5119, 0.4147], 'bri': 164}
if transition_seconds:
command['transitiontime'] = _hue_process_transition_time(
transition_seconds)
command['transitiontime'] = \
_hue_process_transition_time(transition_seconds)
self.bridge.set_light(light_id, command)
def turn_light_off(self, light_id=None, transition_seconds=None):
""" Turn the specified or all lights off. """
if not light_id:
@ -240,8 +237,8 @@ class HueLightControl(object):
command = {'on': False}
if transition_seconds:
command['transitiontime'] = _hue_process_transition_time(
transition_seconds)
command['transitiontime'] = \
_hue_process_transition_time(transition_seconds)
self.bridge.set_light(light_id, command)
@ -307,13 +304,13 @@ def setup_file_downloader(eventbus, download_path):
except requests.exceptions.ConnectionError:
logger.exception("FileDownloader:ConnectionError occured for {}".
format(event.data['url']))
format(event.data['url']))
eventbus.listen(EVENT_DOWNLOAD_FILE, download_file)
return True
def setup_webbrowser(eventbus):
""" Listen for browse_url events and open
the url in the default webbrowser. """
@ -321,32 +318,37 @@ def setup_webbrowser(eventbus):
import webbrowser
eventbus.listen(EVENT_BROWSE_URL,
lambda event: webbrowser.open(event.data['url']))
lambda event: webbrowser.open(event.data['url']))
return True
def setup_chromecast(eventbus, host):
""" Listen for chromecast events. """
from homeassistant.packages import pychromecast
eventbus.listen("start_fireplace",
lambda event: pychromecast.play_youtube_video(host, "eyU3bRy2x44"))
lambda event:
pychromecast.play_youtube_video(host, "eyU3bRy2x44"))
eventbus.listen("start_epic_sax",
lambda event: pychromecast.play_youtube_video(host, "kxopViU98Xo"))
lambda event:
pychromecast.play_youtube_video(host, "kxopViU98Xo"))
eventbus.listen(EVENT_CHROMECAST_YOUTUBE_VIDEO,
lambda event: pychromecast.play_youtube_video(host, event.data['video']))
lambda event:
pychromecast.play_youtube_video(host, event.data['video']))
return True
def setup_media_buttons(eventbus):
""" Listen for keyboard events. """
try:
import pykeyboard
except ImportError:
logging.getLogger(__name__).exception(("MediaButtons:"
"Error while importing dependency PyUserInput."))
logging.getLogger(__name__).exception(
"MediaButtons: Error while importing dependency PyUserInput.")
return False
@ -354,21 +356,27 @@ def setup_media_buttons(eventbus):
keyboard.special_key_assignment()
eventbus.listen(EVENT_KEYBOARD_VOLUME_UP,
lambda event: keyboard.tap_key(keyboard.volume_up_key))
lambda event:
keyboard.tap_key(keyboard.volume_up_key))
eventbus.listen(EVENT_KEYBOARD_VOLUME_DOWN,
lambda event: keyboard.tap_key(keyboard.volume_down_key))
lambda event:
keyboard.tap_key(keyboard.volume_down_key))
eventbus.listen(EVENT_KEYBOARD_VOLUME_MUTE,
lambda event: keyboard.tap_key(keyboard.volume_mute_key))
lambda event:
keyboard.tap_key(keyboard.volume_mute_key))
eventbus.listen(EVENT_KEYBOARD_MEDIA_PLAY_PAUSE,
lambda event: keyboard.tap_key(keyboard.media_play_pause_key))
lambda event:
keyboard.tap_key(keyboard.media_play_pause_key))
eventbus.listen(EVENT_KEYBOARD_MEDIA_NEXT_TRACK,
lambda event: keyboard.tap_key(keyboard.media_next_track_key))
lambda event:
keyboard.tap_key(keyboard.media_next_track_key))
eventbus.listen(EVENT_KEYBOARD_MEDIA_PREV_TRACK,
lambda event: keyboard.tap_key(keyboard.media_prev_track_key))
lambda event:
keyboard.tap_key(keyboard.media_prev_track_key))
return True

View File

@ -10,6 +10,7 @@ import homeassistant.observers as observers
import homeassistant.actors as actors
import homeassistant.httpinterface as httpinterface
# pylint: disable=too-many-branches
def from_config_file(config_path):
""" Starts home assistant with all possible functionality
@ -28,15 +29,15 @@ def from_config_file(config_path):
# Init observers
# Device scanner
if config.has_option('tomato', 'host') and \
config.has_option('tomato', 'username') and \
config.has_option('tomato', 'password') and \
config.has_option('tomato', 'http_id'):
config.has_option('tomato', 'username') and \
config.has_option('tomato', 'password') and \
config.has_option('tomato', 'http_id'):
device_scanner = observers.TomatoDeviceScanner(
config.get('tomato','host'),
config.get('tomato','username'),
config.get('tomato','password'),
config.get('tomato','http_id'))
config.get('tomato', 'host'),
config.get('tomato', 'username'),
config.get('tomato', 'password'),
config.get('tomato', 'http_id'))
if device_scanner.success_init:
statusses.append(("Device Scanner - Tomato", True))
@ -49,28 +50,27 @@ def from_config_file(config_path):
else:
device_scanner = None
# Device Tracker
if device_scanner:
device_tracker = observers.DeviceTracker(eventbus, statemachine,
device_scanner)
device_tracker = observers.DeviceTracker(
eventbus, statemachine, device_scanner)
statusses.append(("Device Tracker", True))
else:
device_tracker = None
# Sun tracker
if config.has_option("common", "latitude") and \
config.has_option("common", "longitude"):
config.has_option("common", "longitude"):
statusses.append(("Weather - Ephem",
observers.track_sun(eventbus, statemachine,
config.get("common","latitude"),
config.get("common","longitude"))))
observers.track_sun(
eventbus, statemachine,
config.get("common", "latitude"),
config.get("common", "longitude"))))
# --------------------------
# Init actors
# Light control
if config.has_section("hue"):
@ -84,7 +84,6 @@ def from_config_file(config_path):
else:
light_control = None
# Light trigger
if light_control:
actors.LightTrigger(eventbus, statemachine,
@ -92,28 +91,26 @@ def from_config_file(config_path):
statusses.append(("Light Trigger", True))
if config.has_option("chromecast", "host"):
statusses.append(("Chromecast", actors.setup_chromecast(eventbus,
config.get("chromecast", "host"))))
statusses.append(("Chromecast",
actors.setup_chromecast(
eventbus, config.get("chromecast", "host"))))
if config.has_option("downloader", "download_dir"):
result = actors.setup_file_downloader(eventbus,
config.get("downloader", "download_dir"))
result = actors.setup_file_downloader(
eventbus, config.get("downloader", "download_dir"))
statusses.append(("Downloader", result))
statusses.append(("Webbrowser", actors.setup_webbrowser(eventbus)))
statusses.append(("Media Buttons", actors.setup_media_buttons(eventbus)))
# Init HTTP interface
if config.has_option("httpinterface", "api_password"):
httpinterface.HTTPInterface(eventbus, statemachine,
config.get("httpinterface","api_password"))
httpinterface.HTTPInterface(
eventbus, statemachine,
config.get("httpinterface", "api_password"))
statusses.append(("HTTPInterface", True))

View File

@ -100,6 +100,7 @@ URL_API_EVENTS_EVENT = "/api/events/{}"
URL_STATIC = "/static/{}"
class HTTPInterface(threading.Thread):
""" Provides an HTTP interface for Home Assistant. """
@ -133,37 +134,38 @@ class HTTPInterface(threading.Thread):
self.server.serve_forever()
class RequestHandler(BaseHTTPRequestHandler):
""" Handles incoming HTTP requests """
PATHS = [ # debug interface
('GET', '/', '_handle_get_root'),
('POST', re.compile(r'/change_state'), '_handle_change_state'),
('POST', re.compile(r'/fire_event'), '_handle_fire_event'),
PATHS = [ # debug interface
('GET', '/', '_handle_get_root'),
('POST', re.compile(r'/change_state'), '_handle_change_state'),
('POST', re.compile(r'/fire_event'), '_handle_fire_event'),
# /states
('GET', '/api/states', '_handle_get_api_states'),
('GET',
re.compile(r'/api/states/(?P<category>[a-zA-Z\._0-9]+)'),
'_handle_get_api_states_category'),
('POST',
re.compile(r'/api/states/(?P<category>[a-zA-Z\._0-9]+)'),
'_handle_change_state'),
# /states
('GET', '/api/states', '_handle_get_api_states'),
('GET',
re.compile(r'/api/states/(?P<category>[a-zA-Z\._0-9]+)'),
'_handle_get_api_states_category'),
('POST',
re.compile(r'/api/states/(?P<category>[a-zA-Z\._0-9]+)'),
'_handle_change_state'),
# /events
('GET', '/api/events', '_handle_get_api_events'),
('POST',
re.compile(r'/api/events/(?P<event_type>[a-zA-Z\._0-9]+)'),
'_handle_fire_event'),
# /events
('GET', '/api/events', '_handle_get_api_events'),
('POST',
re.compile(r'/api/events/(?P<event_type>[a-zA-Z\._0-9]+)'),
'_handle_fire_event'),
# Statis files
('GET', re.compile(r'/static/(?P<file>[a-zA-Z\._\-0-9/]+)'),
'_handle_get_static')
]
# Statis files
('GET', re.compile(r'/static/(?P<file>[a-zA-Z\._\-0-9/]+)'),
'_handle_get_static')
]
use_json = False
def _handle_request(self, method): # pylint: disable=too-many-branches
def _handle_request(self, method): # pylint: disable=too-many-branches
""" Does some common checks and calls appropriate method. """
url = urlparse(self.path)
@ -201,7 +203,6 @@ class RequestHandler(BaseHTTPRequestHandler):
# pylint: disable=maybe-no-member
path_match = t_path.match(url.path)
if path_match and method == t_method:
# Call the method
handle_request_method = getattr(self, t_handler)
@ -210,7 +211,6 @@ class RequestHandler(BaseHTTPRequestHandler):
elif path_match:
path_matched_but_not_method = True
# Did we find a handler for the incoming request?
if handle_request_method:
@ -226,11 +226,11 @@ class RequestHandler(BaseHTTPRequestHandler):
else:
self.send_response(HTTP_NOT_FOUND)
def do_GET(self): # pylint: disable=invalid-name
def do_GET(self): # pylint: disable=invalid-name
""" GET request handler. """
self._handle_request('GET')
def do_POST(self): # pylint: disable=invalid-name
def do_POST(self): # pylint: disable=invalid-name
""" POST request handler. """
self._handle_request('POST')
@ -241,11 +241,12 @@ class RequestHandler(BaseHTTPRequestHandler):
return True
elif self.use_json:
self._message("API password missing or incorrect.",
HTTP_UNAUTHORIZED)
self._message(
"API password missing or incorrect.", HTTP_UNAUTHORIZED)
else:
self.send_response(HTTP_OK)
self.send_header('Content-type','text/html')
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write((
@ -280,7 +281,7 @@ class RequestHandler(BaseHTTPRequestHandler):
write = lambda txt: self.wfile.write(txt+"\n")
self.send_response(HTTP_OK)
self.send_header('Content-type','text/html')
self.send_header('Content-type', 'text/html')
self.end_headers()
write(("<html>"
@ -297,8 +298,9 @@ class RequestHandler(BaseHTTPRequestHandler):
# Flash message support
if self.server.flash_message:
write(("<div class='row'><div class='alert alert-success'>"
"{}</div></div>").format(self.server.flash_message))
write(("<div class='row'><div class='col-xs-12'>"
"<div class='alert alert-success'>"
"{}</div></div></div>").format(self.server.flash_message))
self.server.flash_message = None
@ -320,7 +322,7 @@ class RequestHandler(BaseHTTPRequestHandler):
for category in \
sorted(self.server.statemachine.categories,
key=lambda key: key.lower()):
key=lambda key: key.lower()):
categories.append(category)
@ -332,12 +334,11 @@ class RequestHandler(BaseHTTPRequestHandler):
write(("<tr>"
"<td>{}</td><td>{}</td><td>{}</td><td>{}</td>"
"</tr>").
format(category,
state['state'],
attributes,
state['last_changed']))
"</tr>").format(
category,
state['state'],
attributes,
state['last_changed']))
# Change state form
write(("<tr><td><input name='category' class='form-control' "
@ -354,8 +355,6 @@ class RequestHandler(BaseHTTPRequestHandler):
"</div></div>"))
# Describe event bus:
write(("<div class='row'>"
"<div class='col-xs-6'>"
@ -365,51 +364,49 @@ class RequestHandler(BaseHTTPRequestHandler):
"<table class='table'>"
"<tr><th>Event Type</th><th>Listeners</th></tr>"))
for event_type, count in sorted(self.server.eventbus.listeners.items()):
for event_type, count in sorted(
self.server.eventbus.listeners.items()):
write("<tr><td>{}</td><td>{}</td></tr>".format(event_type, count))
write(( "</table></div></div>"
write(("</table></div></div>"
"<div class='col-xs-6'>"
"<div class='col-xs-6'>"
"<div class='panel panel-primary'>"
"<div class='panel-heading'><h2 class='panel-title'>"
"Fire Event</h2></div>"
"<div class='panel-body'>"
"<form method='post' action='/fire_event' "
"class='form-horizontal form-fire-event'>"
"<input type='hidden' name='api_password' value='{}'>"
"<div class='panel-body'>"
"<form method='post' action='/fire_event' "
"class='form-horizontal form-fire-event'>"
"<input type='hidden' name='api_password' value='{}'>"
"<div class='form-group'>"
"<label for='event_type' class='col-xs-3 control-label'>"
"Event type</label>"
"<div class='col-xs-9'>"
"<input type='text' class='form-control' id='event_type'"
" name='event_type' placeholder='Event Type'>"
"</div>"
"</div>"
"<div class='form-group'>"
"<label for='event_type' class='col-xs-3 control-label'>"
"Event type</label>"
"<div class='col-xs-9'>"
"<input type='text' class='form-control' id='event_type'"
" name='event_type' placeholder='Event Type'>"
"</div>"
"</div>"
"<div class='form-group'>"
"<label for='event_data' class='col-xs-3 control-label'>"
"Event data</label>"
"<div class='col-xs-9'>"
"<textarea rows='3' class='form-control' id='event_data'"
" name='event_data' placeholder='Event Data "
"(JSON, optional)'></textarea>"
"</div>"
"</div>"
"<div class='form-group'>"
"<label for='event_data' class='col-xs-3 control-label'>"
"Event data</label>"
"<div class='col-xs-9'>"
"<textarea rows='3' class='form-control' id='event_data'"
" name='event_data' placeholder='Event Data "
"(JSON, optional)'></textarea>"
"</div>"
"</div>"
"<div class='form-group'>"
"<div class='col-xs-offset-3 col-xs-9'>"
"<button type='submit' class='btn btn-default'>"
"Fire Event</button>"
"</div>"
"</div>"
"</form>"
"</div></div></div>"
"</div>").format(self.server.api_password))
"<div class='form-group'>"
"<div class='col-xs-offset-3 col-xs-9'>"
"<button type='submit' class='btn btn-default'>"
"Fire Event</button>"
"</div>"
"</div>"
"</form>"
"</div></div></div>"
"</div>").format(self.server.api_password))
write("</div></body></html>")
@ -448,20 +445,21 @@ class RequestHandler(BaseHTTPRequestHandler):
state['category'] = category
self._write_json(state, status_code=HTTP_CREATED,
location=URL_API_STATES_CATEGORY.format(category))
location=
URL_API_STATES_CATEGORY.format(category))
else:
self._message("State of {} changed to {}".format(
category, new_state))
self._message(
"State of {} changed to {}".format(category, new_state))
except KeyError:
# If new_state don't exist in post data
self._message("No new_state submitted.",
HTTP_BAD_REQUEST)
self._message(
"No new_state submitted.", HTTP_BAD_REQUEST)
except ValueError:
# Occurs during error parsing json
self._message("Invalid JSON for attributes",
HTTP_UNPROCESSABLE_ENTITY)
self._message(
"Invalid JSON for attributes", HTTP_UNPROCESSABLE_ENTITY)
# pylint: disable=invalid-name
def _handle_fire_event(self, path_match, data):
@ -494,8 +492,8 @@ class RequestHandler(BaseHTTPRequestHandler):
except ValueError:
# Occurs during error parsing json
self._message("Invalid JSON for event_data",
HTTP_UNPROCESSABLE_ENTITY)
self._message(
"Invalid JSON for event_data", HTTP_UNPROCESSABLE_ENTITY)
# pylint: disable=unused-argument
def _handle_get_api_states(self, path_match, data):
@ -560,14 +558,17 @@ class RequestHandler(BaseHTTPRequestHandler):
def _redirect(self, location):
""" Helper method to redirect caller. """
self.send_response(HTTP_MOVED_PERMANENTLY)
self.send_header("Location", "{}?api_password={}".
format(location, self.server.api_password))
self.send_header(
"Location", "{}?api_password={}".format(
location, self.server.api_password))
self.end_headers()
def _write_json(self, data=None, status_code=HTTP_OK, location=None):
""" Helper method to return JSON to the caller. """
self.send_response(status_code)
self.send_header('Content-type','application/json')
self.send_header('Content-type', 'application/json')
if location:
self.send_header('Location', location)

View File

@ -52,7 +52,7 @@ def track_sun(eventbus, statemachine, latitude, longitude):
logger.exception("TrackSun:Error while importing dependency ephem.")
return False
sun = ephem.Sun() # pylint: disable=no-member
sun = ephem.Sun() # pylint: disable=no-member
def update_sun_state(now): # pylint: disable=unused-argument
""" Method to update the current state of the sun and
@ -72,24 +72,26 @@ def track_sun(eventbus, statemachine, latitude, longitude):
new_state = SUN_STATE_BELOW_HORIZON
next_change = next_rising
logger.info("Sun:{}. Next change: {}".
format(new_state, next_change.strftime("%H:%M")))
logger.info(
"Sun:{}. Next change: {}".format(new_state,
next_change.strftime("%H:%M")))
state_attributes = {
STATE_ATTRIBUTE_NEXT_SUN_RISING: ha.datetime_to_str(next_rising),
STATE_ATTRIBUTE_NEXT_SUN_SETTING: ha.datetime_to_str(next_setting)
STATE_ATTRIBUTE_NEXT_SUN_RISING: ha.datetime_to_str(next_rising),
STATE_ATTRIBUTE_NEXT_SUN_SETTING: ha.datetime_to_str(next_setting)
}
statemachine.set_state(STATE_CATEGORY_SUN, new_state, state_attributes)
# +10 seconds to be sure that the change has occured
ha.track_time_change(eventbus, update_sun_state,
point_in_time=next_change + timedelta(seconds=10))
point_in_time=next_change + timedelta(seconds=10))
update_sun_state(None)
return True
class DeviceTracker(object):
""" Class that tracks which devices are home and which are not. """
@ -142,7 +144,7 @@ class DeviceTracker(object):
suffix = "_{}".format(tries)
category = STATE_CATEGORY_DEVICE_FORMAT.format(
name + suffix)
name + suffix)
if category not in used_categories:
break
@ -154,18 +156,22 @@ class DeviceTracker(object):
except KeyError:
self.invalid_known_devices_file = False
self.logger.warning(("Invalid {} found. "
self.logger.warning((
"Invalid {} found. "
"We won't update it with new found devices.").
format(KNOWN_DEVICES_FILE))
if len(self.device_state_categories()) == 0:
self.logger.warning("No devices to track. Please update {}.".
format(KNOWN_DEVICES_FILE))
if len(self.device_state_categories) == 0:
self.logger.warning(
"No devices to track. Please update {}.".format(
KNOWN_DEVICES_FILE))
ha.track_time_change(eventbus,
lambda time: self.update_devices(device_scanner.scan_devices()))
lambda time:
self.update_devices(
device_scanner.scan_devices()))
@property
def device_state_categories(self):
""" Returns a list containing all categories
that are maintained for devices. """
@ -195,7 +201,7 @@ class DeviceTracker(object):
# not show up for 1 scan beacuse of reboot etc
for device in temp_tracking_devices:
if (datetime.now() - self.known_devices[device]['last_seen'] >
TIME_SPAN_FOR_ERROR_IN_SCANNING):
TIME_SPAN_FOR_ERROR_IN_SCANNING):
self.statemachine.set_state(
self.known_devices[device]['category'],
@ -203,7 +209,7 @@ class DeviceTracker(object):
# Get the currently used statuses
states_of_devices = [self.statemachine.get_state(category)['state']
for category in self.device_state_categories()]
for category in self.device_state_categories]
# Update the all devices category
all_devices_state = (DEVICE_STATE_HOME if DEVICE_STATE_HOME
@ -226,7 +232,8 @@ class DeviceTracker(object):
is_new_file = not os.path.isfile(KNOWN_DEVICES_FILE)
with open(KNOWN_DEVICES_FILE, 'a') as outp:
self.logger.info(("DeviceTracker:Found {} new devices,"
self.logger.info((
"DeviceTracker:Found {} new devices,"
" updating {}").format(len(unknown_devices),
KNOWN_DEVICES_FILE))
@ -237,19 +244,20 @@ class DeviceTracker(object):
for device in unknown_devices:
# See if the device scanner knows the name
temp_name = self.device_scanner.get_device_name(
device)
temp_name = \
self.device_scanner.get_device_name(device)
name = temp_name if temp_name else "unknown_device"
writer.writerow((device, name, 0))
self.known_devices[device] = {'name':name,
self.known_devices[device] = {'name': name,
'track': False}
except IOError:
self.logger.exception(("DeviceTracker:Error updating {}"
"with {} new devices").format(KNOWN_DEVICES_FILE,
len(unknown_devices)))
self.logger.exception((
"DeviceTracker:Error updating {}"
"with {} new devices").format(
KNOWN_DEVICES_FILE, len(unknown_devices)))
self.lock.release()
@ -268,7 +276,7 @@ class TomatoDeviceScanner(object):
data={'_http_id': http_id,
'exec': 'devlist'},
auth=requests.auth.HTTPBasicAuth(
username, password)).prepare()
username, password)).prepare()
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
@ -298,7 +306,6 @@ class TomatoDeviceScanner(object):
filter_named = [item[0] for item in self.last_results['dhcpd_lease']
if item[2] == device]
if len(filter_named) == 0 or filter_named[0] == "":
return None
else:
@ -312,7 +319,7 @@ class TomatoDeviceScanner(object):
# if date_updated is None or the date is too old we scan for new data
if (not self.date_updated or datetime.now() - self.date_updated >
TOMATO_MIN_TIME_BETWEEN_SCANS):
TOMATO_MIN_TIME_BETWEEN_SCANS):
self.logger.info("Tomato:Scanning")
@ -321,15 +328,16 @@ class TomatoDeviceScanner(object):
# Calling and parsing the Tomato api here. We only need the
# wldev and dhcpd_lease values. For API description see:
# http://paulusschoutsen.nl/blog/2013/10/tomato-api-documentation/
# http://paulusschoutsen.nl/
# blog/2013/10/tomato-api-documentation/
if response.status_code == 200:
for param, value in self.parse_api_pattern.findall(
response.text):
for param, value in \
self.parse_api_pattern.findall(response.text):
if param == 'wldev' or param == 'dhcpd_lease':
self.last_results[param] = json.loads(value.
replace("'",'"'))
self.last_results[param] = \
json.loads(value.replace("'", '"'))
self.date_updated = datetime.now()
@ -337,7 +345,8 @@ class TomatoDeviceScanner(object):
elif response.status_code == 401:
# Authentication error
self.logger.exception(("Tomato:Failed to authenticate, "
self.logger.exception((
"Tomato:Failed to authenticate, "
"please check your username and password"))
return False
@ -345,7 +354,8 @@ class TomatoDeviceScanner(object):
except requests.exceptions.ConnectionError:
# We get this if we could not connect to the router or
# an invalid http_id was supplied
self.logger.exception(("Tomato:Failed to connect to the router"
self.logger.exception((
"Tomato:Failed to connect to the router"
" or invalid http_id supplied"))
return False
@ -353,15 +363,15 @@ class TomatoDeviceScanner(object):
except requests.exceptions.Timeout:
# We get this if we could not connect to the router or
# an invalid http_id was supplied
self.logger.exception(("Tomato:Connection to the router "
"timed out"))
self.logger.exception(
"Tomato:Connection to the router timed out")
return False
except ValueError:
# If json decoder could not parse the response
self.logger.exception(("Tomato:Failed to parse response "
"from router"))
self.logger.exception(
"Tomato:Failed to parse response from router")
return False

View File

@ -22,6 +22,7 @@ import homeassistant.httpinterface as hah
METHOD_GET = "get"
METHOD_POST = "post"
def _setup_call_api(host, port, api_password):
""" Helper method to setup a call api method. """
port = port or hah.SERVER_PORT
@ -68,21 +69,21 @@ class EventBus(ha.EventBus):
else:
raise ha.HomeAssistantException(
"Got unexpected result (3): {}.".format(req.text))
"Got unexpected result (3): {}.".format(req.text))
except requests.exceptions.ConnectionError:
self.logger.exception("EventBus:Error connecting to server")
raise ha.HomeAssistantException("Error connecting to server")
except ValueError: # If req.json() can't parse the json
except ValueError: # If req.json() can't parse the json
self.logger.exception("EventBus:Got unexpected result")
raise ha.HomeAssistantException(
"Got unexpected result: {}".format(req.text))
"Got unexpected result: {}".format(req.text))
except KeyError: # If not all expected keys are in the returned JSON
except KeyError: # If not all expected keys are in the returned JSON
self.logger.exception("EventBus:Got unexpected result (2)")
raise ha.HomeAssistantException(
"Got unexpected result (2): {}".format(req.text))
"Got unexpected result (2): {}".format(req.text))
def fire(self, event_type, event_data=None):
""" Fire an event. """
@ -96,7 +97,7 @@ class EventBus(ha.EventBus):
if req.status_code != 200:
error = "Error firing event: {} - {}".format(
req.status_code, req.text)
req.status_code, req.text)
self.logger.error("EventBus:{}".format(error))
raise ha.HomeAssistantException(error)
@ -117,6 +118,7 @@ class EventBus(ha.EventBus):
raise NotImplementedError
class StateMachine(ha.StateMachine):
""" Drop-in replacement for a normal statemachine that communicates with a
remote statemachine.
@ -143,11 +145,11 @@ class StateMachine(ha.StateMachine):
self.logger.exception("StateMachine:Error connecting to server")
return []
except ValueError: # If req.json() can't parse the json
except ValueError: # If req.json() can't parse the json
self.logger.exception("StateMachine:Got unexpected result")
return []
except KeyError: # If 'categories' key not in parsed json
except KeyError: # If 'categories' key not in parsed json
self.logger.exception("StateMachine:Got unexpected result (2)")
return []
@ -170,7 +172,7 @@ class StateMachine(ha.StateMachine):
if req.status_code != 201:
error = "Error changing state: {} - {}".format(
req.status_code, req.text)
req.status_code, req.text)
self.logger.error("StateMachine:{}".format(error))
raise ha.HomeAssistantException(error)
@ -193,9 +195,9 @@ class StateMachine(ha.StateMachine):
if req.status_code == 200:
data = req.json()
return ha.create_state(data['state'],
data['attributes'],
ha.str_to_datetime(data['last_changed']))
return ha.create_state(data['state'], data['attributes'],
ha.str_to_datetime(
data['last_changed']))
elif req.status_code == 422:
# Category does not exist
@ -203,18 +205,18 @@ class StateMachine(ha.StateMachine):
else:
raise ha.HomeAssistantException(
"Got unexpected result (3): {}.".format(req.text))
"Got unexpected result (3): {}.".format(req.text))
except requests.exceptions.ConnectionError:
self.logger.exception("StateMachine:Error connecting to server")
raise ha.HomeAssistantException("Error connecting to server")
except ValueError: # If req.json() can't parse the json
except ValueError: # If req.json() can't parse the json
self.logger.exception("StateMachine:Got unexpected result")
raise ha.HomeAssistantException(
"Got unexpected result: {}".format(req.text))
"Got unexpected result: {}".format(req.text))
except KeyError: # If not all expected keys are in the returned JSON
except KeyError: # If not all expected keys are in the returned JSON
self.logger.exception("StateMachine:Got unexpected result (2)")
raise ha.HomeAssistantException(
"Got unexpected result (2): {}".format(req.text))
"Got unexpected result (2): {}".format(req.text))

View File

@ -19,14 +19,17 @@ API_PASSWORD = "test1234"
HTTP_BASE_URL = "http://127.0.0.1:{}".format(hah.SERVER_PORT)
def _url(path=""):
""" Helper method to generate urls. """
return HTTP_BASE_URL + path
class HAHelper(object): # pylint: disable=too-few-public-methods
class HAHelper(object): # pylint: disable=too-few-public-methods
""" Helper class to keep track of current running HA instance. """
core = None
def ensure_homeassistant_started():
""" Ensures home assistant is started. """
@ -38,7 +41,7 @@ def ensure_homeassistant_started():
core['statemachine'].set_state('test', 'a_state')
hah.HTTPInterface(core['eventbus'], core['statemachine'],
API_PASSWORD)
API_PASSWORD)
core['eventbus'].fire(ha.EVENT_START)
@ -49,6 +52,7 @@ def ensure_homeassistant_started():
return HAHelper.core['eventbus'], HAHelper.core['statemachine']
# pylint: disable=too-many-public-methods
class TestHTTPInterface(unittest.TestCase):
""" Test the HTTP debug interface and API. """
@ -63,36 +67,34 @@ class TestHTTPInterface(unittest.TestCase):
logged in screen. """
with_pw = requests.get(
_url("/?api_password={}".format(API_PASSWORD)))
_url("/?api_password={}".format(API_PASSWORD)))
without_pw = requests.get(_url())
self.assertNotEqual(without_pw.text, with_pw.text)
def test_api_password(self):
""" Test if we get access denied if we omit or provide
a wrong api password. """
req = requests.get(
_url(hah.URL_API_STATES_CATEGORY.format("test")))
_url(hah.URL_API_STATES_CATEGORY.format("test")))
self.assertEqual(req.status_code, 401)
req = requests.get(
_url(hah.URL_API_STATES_CATEGORY.format("test")),
params={"api_password":"not the password"})
_url(hah.URL_API_STATES_CATEGORY.format("test")),
params={"api_password": "not the password"})
self.assertEqual(req.status_code, 401)
def test_debug_change_state(self):
""" Test if we can change a state from the debug interface. """
self.statemachine.set_state("test.test", "not_to_be_set_state")
requests.post(_url(hah.URL_CHANGE_STATE),
data={"category": "test.test",
"new_state":"debug_state_change2",
"api_password":API_PASSWORD})
data={"category": "test.test",
"new_state": "debug_state_change2",
"api_password": API_PASSWORD})
self.assertEqual(self.statemachine.get_state("test.test")['state'],
"debug_state_change2")
@ -112,32 +114,29 @@ class TestHTTPInterface(unittest.TestCase):
requests.post(
_url(hah.URL_FIRE_EVENT),
data={"event_type": "test_event_with_data",
"event_data":'{"test": 1}',
"api_password":API_PASSWORD})
"event_data": '{"test": 1}',
"api_password": API_PASSWORD})
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
def test_api_list_state_categories(self):
""" Test if the debug interface allows us to list state categories. """
req = requests.get(_url(hah.URL_API_STATES),
data={"api_password":API_PASSWORD})
data={"api_password": API_PASSWORD})
data = req.json()
self.assertEqual(self.statemachine.categories,
data['categories'])
def test_api_get_state(self):
""" Test if the debug interface allows us to get a state. """
req = requests.get(
_url(hah.URL_API_STATES_CATEGORY.format("test")),
data={"api_password":API_PASSWORD})
_url(hah.URL_API_STATES_CATEGORY.format("test")),
data={"api_password": API_PASSWORD})
data = req.json()
@ -151,8 +150,8 @@ class TestHTTPInterface(unittest.TestCase):
def test_api_get_non_existing_state(self):
""" Test if the debug interface allows us to get a state. """
req = requests.get(
_url(hah.URL_API_STATES_CATEGORY.format("does_not_exist")),
params={"api_password":API_PASSWORD})
_url(hah.URL_API_STATES_CATEGORY.format("does_not_exist")),
params={"api_password": API_PASSWORD})
self.assertEqual(req.status_code, 422)
@ -162,8 +161,8 @@ class TestHTTPInterface(unittest.TestCase):
self.statemachine.set_state("test.test", "not_to_be_set_state")
requests.post(_url(hah.URL_API_STATES_CATEGORY.format("test.test")),
data={"new_state":"debug_state_change2",
"api_password":API_PASSWORD})
data={"new_state": "debug_state_change2",
"api_password": API_PASSWORD})
self.assertEqual(self.statemachine.get_state("test.test")['state'],
"debug_state_change2")
@ -176,13 +175,13 @@ class TestHTTPInterface(unittest.TestCase):
new_state = "debug_state_change"
req = requests.post(
_url(hah.URL_API_STATES_CATEGORY.format(
"test_category_that_does_not_exist")),
data={"new_state": new_state,
"api_password": API_PASSWORD})
_url(hah.URL_API_STATES_CATEGORY.format(
"test_category_that_does_not_exist")),
data={"new_state": new_state,
"api_password": API_PASSWORD})
cur_state = (self.statemachine.
get_state("test_category_that_does_not_exist")['state'])
get_state("test_category_that_does_not_exist")['state'])
self.assertEqual(req.status_code, 201)
self.assertEqual(cur_state, new_state)
@ -200,7 +199,7 @@ class TestHTTPInterface(unittest.TestCase):
requests.post(
_url(hah.URL_API_EVENTS_EVENT.format("test.event_no_data")),
data={"api_password":API_PASSWORD})
data={"api_password": API_PASSWORD})
# Allow the event to take place
time.sleep(1)
@ -222,15 +221,14 @@ class TestHTTPInterface(unittest.TestCase):
requests.post(
_url(hah.URL_API_EVENTS_EVENT.format("test_event_with_data")),
data={"event_data":'{"test": 1}',
"api_password":API_PASSWORD})
data={"event_data": '{"test": 1}',
"api_password": API_PASSWORD})
# Allow the event to take place
time.sleep(1)
self.assertEqual(len(test_value), 1)
# pylint: disable=invalid-name
def test_api_fire_event_with_invalid_json(self):
""" Test if the API allows us to fire an event. """
@ -244,9 +242,8 @@ class TestHTTPInterface(unittest.TestCase):
req = requests.post(
_url(hah.URL_API_EVENTS_EVENT.format("test_event")),
data={"event_data":'not json',
"api_password":API_PASSWORD})
data={"event_data": 'not json',
"api_password": API_PASSWORD})
# It shouldn't but if it fires, allow the event to take place
time.sleep(1)
@ -257,12 +254,13 @@ class TestHTTPInterface(unittest.TestCase):
def test_api_get_event_listeners(self):
""" Test if we can get the list of events being listened for. """
req = requests.get(_url(hah.URL_API_EVENTS),
params={"api_password":API_PASSWORD})
params={"api_password": API_PASSWORD})
data = req.json()
self.assertEqual(data['listeners'], self.eventbus.listeners)
class TestRemote(unittest.TestCase):
""" Test the homeassistant.remote module. """
@ -283,7 +281,6 @@ class TestRemote(unittest.TestCase):
self.assertEqual(self.statemachine.categories,
self.remote_sm.categories)
def test_remote_sm_get_state(self):
""" Test if the debug interface allows us to list state categories. """
remote_state = self.remote_sm.get_state("test")
@ -294,7 +291,6 @@ class TestRemote(unittest.TestCase):
self.assertEqual(remote_state['last_changed'], state['last_changed'])
self.assertEqual(remote_state['attributes'], state['attributes'])
def test_remote_sm_get_non_existing_state(self):
""" Test if the debug interface allows us to list state categories. """
self.assertEqual(self.remote_sm.get_state("test_does_not_exist"), None)

View File

@ -2,7 +2,7 @@
import re
def sanitize_filename(filename):
""" Sanitizes a filename by removing .. / and \\. """
return re.sub(r"(~|(\.\.)|/|\+)", "", filename)