Bus and StateMachine are now thread-safe

This commit is contained in:
Paulus Schoutsen 2014-01-19 21:39:57 -08:00
parent ac0ca5d001
commit e7f5953362

View File

@ -158,85 +158,90 @@ class Bus(object):
self._event_listeners = {}
self._services = {}
self.logger = logging.getLogger(__name__)
self.event_lock = threading.Lock()
self.service_lock = threading.Lock()
@property
def services(self):
""" Dict with per domain a list of available services. """
return {domain: self._services[domain].keys()
for domain in self._services}
with self.service_lock:
return {domain: self._services[domain].keys()
for domain in self._services}
@property
def event_listeners(self):
""" Dict with events that is being listened for and the number
of listeners.
"""
return {key: len(self._event_listeners[key])
for key in self._event_listeners}
with self.event_lock:
return {key: len(self._event_listeners[key])
for key in self._event_listeners}
def call_service(self, domain, service, service_data=None):
""" Calls a service. """
try:
self._services[domain][service]
except KeyError:
# Domain or Service does not exist
raise ServiceDoesNotExistException(
"Service does not exist: {}/{}".format(domain, service))
if not service_data:
service_data = {}
def run():
""" Executes a service. """
service_call = ServiceCall(self, domain, service, service_data)
with self.service_lock:
try:
self._services[domain][service](service_call)
except Exception: # pylint: disable=broad-except
self.logger.exception("Bus:Exception in service {}/{}".format(
domain, service))
self._services[domain][service]
except KeyError:
# Domain or Service does not exist
raise ServiceDoesNotExistException(
"Service does not exist: {}/{}".format(domain, service))
# We dont want the eventbus to be blocking - run in a thread.
threading.Thread(target=run).start()
service_data = service_data or {}
def run():
""" Executes a service. """
service_call = ServiceCall(self, domain, service, service_data)
try:
self._services[domain][service](service_call)
except Exception: # pylint: disable=broad-except
self.logger.exception(
"Bus:Exception in service {}/{}".format(
domain, service))
# We dont want the bus to be blocking - run in a thread.
threading.Thread(target=run).start()
def register_service(self, domain, service, service_callback):
""" Register a service. """
try:
self._services[domain][service] = service_callback
except KeyError:
# Domain does not exist yet
self._services[domain] = {service: service_callback}
with self.service_lock:
try:
self._services[domain][service] = service_callback
except KeyError:
# Domain does not exist yet
self._services[domain] = {service: service_callback}
def fire_event(self, event_type, event_data=None):
""" Fire an event. """
with self.event_lock:
# Copy the list of the current listeners because some listeners
# choose to remove themselves as a listener while being executed
# which causes the iterator to be confused.
get = self._event_listeners.get
listeners = get(MATCH_ALL, []) + get(event_type, [])
# Copy the list of the current listeners because some listeners
# choose to remove themselves as a listener while being executed
# which causes the iterator to be confused.
get = self._event_listeners.get
listeners = get(MATCH_ALL, []) + get(event_type, [])
if not listeners:
return
if not listeners:
return
event_data = event_data or {}
event_data = event_data or {}
self.logger.info("Bus:Event {}: {}".format(
event_type, event_data))
self.logger.info("Bus:Event {}: {}".format(
event_type, event_data))
def run():
""" Fire listeners for event. """
event = Event(self, event_type, event_data)
def run():
""" Fire listeners for event. """
event = Event(self, event_type, event_data)
for listener in listeners:
try:
listener(event)
except Exception: # pylint: disable=broad-except
self.logger.exception(
"Bus:Exception in event listener")
for listener in listeners:
try:
listener(event)
except Exception: # pylint: disable=broad-except
self.logger.exception("Bus:Exception in event listener")
# We dont want the bus to be blocking - run in a thread.
threading.Thread(target=run).start()
# We dont want the bus to be blocking - run in a thread.
threading.Thread(target=run).start()
def listen_event(self, event_type, listener):
""" Listen for all events or events of a specific type.
@ -244,10 +249,11 @@ class Bus(object):
To listen to all events specify the constant ``MATCH_ALL``
as event_type.
"""
try:
self._event_listeners[event_type].append(listener)
except KeyError: # event_type did not exist
self._event_listeners[event_type] = [listener]
with self.event_lock:
try:
self._event_listeners[event_type].append(listener)
except KeyError: # event_type did not exist
self._event_listeners[event_type] = [listener]
def listen_once_event(self, event_type, listener):
""" Listen once for event of a specific type.
@ -257,7 +263,6 @@ class Bus(object):
Note: at the moment it is impossible to remove a one time listener.
"""
def onetime_listener(event):
""" Removes listener from eventbus and then fires listener. """
self.remove_event_listener(event_type, onetime_listener)
@ -268,14 +273,18 @@ class Bus(object):
def remove_event_listener(self, event_type, listener):
""" Removes a listener of a specific event_type. """
try:
self._event_listeners[event_type].remove(listener)
with self.event_lock:
try:
self._event_listeners[event_type].remove(listener)
if len(self._event_listeners[event_type]) == 0:
del self._event_listeners[event_type]
# delete event_type list if empty
if not self._event_listeners[event_type]:
del self._event_listeners[event_type]
except (KeyError, ValueError):
pass
except (KeyError, ValueError):
# KeyError is key event_type did not exist
# ValueError if the list [event_type] did not contain listener
pass
class State(object):
@ -346,20 +355,22 @@ class StateMachine(object):
@property
def categories(self):
""" List of categories which states are being tracked. """
return self.states.keys()
with self.lock:
return self.states.keys()
def remove_category(self, category):
""" Removes a category from the state machine.
Returns boolean to indicate if a category was removed. """
try:
del self.states[category]
with self.lock:
try:
del self.states[category]
return True
return True
except KeyError:
# if category does not exist
return False
except KeyError:
# if category does not exist
return False
def set_state(self, category, new_state, attributes=None):
""" Set the state of a category, add category if it does not exist.
@ -368,44 +379,44 @@ class StateMachine(object):
attributes = attributes or {}
self.lock.acquire()
# Add category if it does not exist
if category not in self.states:
self.states[category] = State(new_state, attributes)
# Change state and fire listeners
else:
old_state = self.states[category]
if old_state.state != new_state or \
old_state.attributes != attributes:
with self.lock:
# Add category if it does not exist
if category not in self.states:
self.states[category] = State(new_state, attributes)
self.bus.fire_event(EVENT_STATE_CHANGED,
{'category': category,
'old_state': old_state,
'new_state': self.states[category]})
# Change state and fire listeners
else:
old_state = self.states[category]
self.lock.release()
if old_state.state != new_state or \
old_state.attributes != attributes:
self.states[category] = State(new_state, attributes)
self.bus.fire_event(EVENT_STATE_CHANGED,
{'category': category,
'old_state': old_state,
'new_state': self.states[category]})
def get_state(self, category):
""" Returns a dict (state, last_changed, attributes) describing
the state of the specified category. """
try:
# Make a copy so people won't mutate the state
return self.states[category].copy()
with self.lock:
try:
# Make a copy so people won't mutate the state
return self.states[category].copy()
except KeyError:
# If category does not exist
return None
except KeyError:
# If category does not exist
return None
def is_state(self, category, state):
""" Returns True if category exists and is specified state. """
cur_state = self.get_state(category)
return cur_state and cur_state.state == state
try:
return self.get_state(category).state == state
except AttributeError:
# get_state returned None
return False
class Timer(threading.Thread):