From 2758d8152584a96319d8a8cf7ff6b5cc6d7f161c Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Sun, 26 Jan 2014 18:44:36 -0800 Subject: [PATCH] New events and service calls now use a shared thread pool --- homeassistant/__init__.py | 152 ++++++++++++++++++++++++-------------- homeassistant/util.py | 56 +++++++++++++- 2 files changed, 151 insertions(+), 57 deletions(-) diff --git a/homeassistant/__init__.py b/homeassistant/__init__.py index fe3f51a1e46..15c7826e105 100644 --- a/homeassistant/__init__.py +++ b/homeassistant/__init__.py @@ -9,7 +9,6 @@ of entities and react to changes. import time import logging import threading -from collections import namedtuple import datetime as dt import homeassistant.util as util @@ -33,6 +32,8 @@ TIMER_INTERVAL = 10 # seconds # every minute. assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!" +BUS_NUM_THREAD = 4 + def start_home_assistant(bus): """ Start home assistant. """ @@ -117,14 +118,64 @@ def track_time_change(bus, action, # point_in_time are exact points in time # so we always remove it after fire if listen_once or point_in_time: - event.bus.remove_event_listener(EVENT_TIME_CHANGED, listener) + bus.remove_event_listener(EVENT_TIME_CHANGED, listener) action(now) bus.listen_event(EVENT_TIME_CHANGED, listener) -ServiceCall = namedtuple("ServiceCall", ["bus", "domain", "service", "data"]) -Event = namedtuple("Event", ["bus", "event_type", "data"]) + +def create_bus_job_handler(logger): + """ Creates a job handler that logs errors to supplied `logger`. """ + + def job_handler(job): + """ Called whenever a job is available to do. """ + try: + func, arg = job + func(arg) + except Exception: # pylint: disable=broad-except + # Catch any exception our service/event_listener might throw + # We do not want to crash our ThreadPool + logger.exception("BusHandler:Exception doing job") + + return job_handler + + +# pylint: disable=too-few-public-methods +class ServiceCall(object): + """ Represents a call to a service. """ + + __slots__ = ['domain', 'service', 'data'] + + def __init__(self, domain, service, data=None): + self.domain = domain + self.service = service + self.data = data or {} + + def __repr__(self): + if self.data: + return "".format( + self.domain, self.service, util.repr_helper(self.data)) + else: + return "".format(self.domain, self.service) + + +# pylint: disable=too-few-public-methods +class Event(object): + """ Represents an event within the Bus. """ + + __slots__ = ['event_type', 'data'] + + def __init__(self, event_type, data=None): + self.event_type = event_type + self.data = data or {} + + def __repr__(self): + if self.data: + return "".format( + self.event_type, util.repr_helper(self.data)) + else: + return "".format(self.event_type) class Bus(object): @@ -132,12 +183,15 @@ class Bus(object): and events. """ - def __init__(self): + def __init__(self, thread_count=None): + thread_count = thread_count or BUS_NUM_THREAD self._event_listeners = {} self._services = {} self.logger = logging.getLogger(__name__) self.event_lock = threading.Lock() self.service_lock = threading.Lock() + self.pool = util.ThreadPool(thread_count, + create_bus_job_handler(self.logger)) @property def services(self): @@ -157,71 +211,51 @@ class Bus(object): def has_service(self, domain, service): """ Returns True if specified service exists. """ - return (domain in self._services and - service in self._services[domain]) + try: + return service in self._services[domain] + except KeyError: # if key 'domain' does not exist + return False def call_service(self, domain, service, service_data=None): """ Calls a service. """ - if not self.has_service(domain, service): - raise ServiceDoesNotExistError( - "Service does not exist: {}/{}".format(domain, service)) + service_call = ServiceCall(domain, service, service_data) with self.service_lock: - service_data = service_data or {} + try: + self.pool.add_job(self._services[domain][service], + service_call) - def run(): - """ Executes a service. """ - service_call = ServiceCall(self, domain, service, service_data) + except KeyError: # if key domain or service does not exist + raise ServiceDoesNotExistError( + "Service does not exist: {}/{}".format(domain, service)) - try: - self._services[domain][service](service_call) - except Exception: # pylint: disable=broad-except - self.logger.exception( - "Bus:Exception in service {}/{}".format( - domain, service)) - - # We dont want the bus to be blocking - run in a thread. - threading.Thread(target=run).start() - - def register_service(self, domain, service, service_callback): + def register_service(self, domain, service, service_func): """ Register a service. """ with self.service_lock: try: - self._services[domain][service] = service_callback - except KeyError: - # Domain does not exist yet - self._services[domain] = {service: service_callback} + self._services[domain][service] = service_func + + except KeyError: # Domain does not exist yet in self._services + self._services[domain] = {service: service_func} def fire_event(self, event_type, event_data=None): """ Fire an event. """ with self.event_lock: # Copy the list of the current listeners because some listeners - # choose to remove themselves as a listener while being executed - # which causes the iterator to be confused. + # remove themselves as a listener while being executed which + # causes the iterator to be confused. get = self._event_listeners.get listeners = get(MATCH_ALL, []) + get(event_type, []) - self.logger.info("Bus:Event {}: {}".format( - event_type, event_data)) + event = Event(event_type, event_data) + + self.logger.info("Bus:Handling {}".format(event)) if not listeners: return - event_data = event_data or {} - - def run(): - """ Fire listeners for event. """ - event = Event(self, event_type, event_data) - - for listener in listeners: - try: - listener(event) - except Exception: # pylint: disable=broad-except - self.logger.exception( - "Bus:Exception in event listener") - - # We dont want the bus to be blocking - run in a thread. - threading.Thread(target=run).start() + for func in listeners: + self.pool.add_job(func, event) def listen_event(self, event_type, listener): """ Listen for all events or events of a specific type. @@ -232,6 +266,7 @@ class Bus(object): with self.event_lock: try: self._event_listeners[event_type].append(listener) + except KeyError: # event_type did not exist self._event_listeners[event_type] = [listener] @@ -242,6 +277,7 @@ class Bus(object): as event_type. Note: at the moment it is impossible to remove a one time listener. + Note2: it is also not guaranteed that it will only run once """ def onetime_listener(event): """ Removes listener from eventbus and then fires listener. """ @@ -259,11 +295,11 @@ class Bus(object): # delete event_type list if empty if not self._event_listeners[event_type]: - del self._event_listeners[event_type] + self._event_listeners.pop(event_type) - except (KeyError, ValueError): - # KeyError is key event_type did not exist - # ValueError if the list [event_type] did not contain listener + except (KeyError, AttributeError): + # KeyError is key event_type listener did not exist + # AttributeError if listener did not exist within event_type pass @@ -317,13 +353,17 @@ class State(object): json_dict['state'], json_dict.get('attributes'), last_changed) - except KeyError: # if key 'state' did not exist + except KeyError: # if key 'entity_id' or 'state' did not exist return None def __repr__(self): - return "".format( - self.state, self.attributes, - util.datetime_to_str(self.last_changed)) + if self.attributes: + return "".format( + self.state, util.repr_helper(self.attributes), + util.datetime_to_str(self.last_changed)) + else: + return "".format( + self.state, util.datetime_to_str(self.last_changed)) class StateMachine(object): diff --git a/homeassistant/util.py b/homeassistant/util.py index af3f54d7812..da6ff836569 100644 --- a/homeassistant/util.py +++ b/homeassistant/util.py @@ -1,5 +1,6 @@ """ Helper methods for various modules. """ - +import threading +import Queue import datetime import re @@ -53,3 +54,56 @@ def filter_entity_ids(entity_ids, domain_filter=None, strip_domain=False): for entity_id in entity_ids if not domain_filter or entity_id.startswith(domain_filter) ] + + +def repr_helper(inp): + """ Helps creating a more readable string representation of objects. """ + if isinstance(inp, dict): + return ", ".join( + repr_helper(key)+"="+repr_helper(item) for key, item in inp.items() + ) + elif isinstance(inp, list): + return '[' + ', '.join(inp) + ']' + elif isinstance(inp, datetime.datetime): + return datetime_to_str(inp) + else: + return str(inp) + + +# Reason why I decided to roll my own ThreadPool instead of using +# multiprocessing.dummy.pool or even better, use multiprocessing.pool and +# not be hurt by the GIL in the cpython interpreter: +# 1. The built in threadpool does not allow me to create custom workers and so +# I would have to wrap every listener that I passed into it with code to log +# the exceptions. Saving a reference to the logger in the worker seemed +# like a more sane thing to do. +# 2. Most event listeners are simple checks if attributes match. If the method +# that they will call takes a long time to complete it might be better to +# put that request in a seperate thread. This is for every component to +# decide on its own instead of enforcing it for everyone. +class ThreadPool(object): + """ A simple queue-based thread pool. + + Will initiate it's workers using worker(queue).start() """ + + # pylint: disable=too-few-public-methods + def __init__(self, worker_count, job_handler): + queue = self.queue = Queue.Queue() + + for _ in xrange(worker_count): + worker = threading.Thread(target=_threadpool_worker, + args=(queue, job_handler)) + worker.daemon = True + worker.start() + + def add_job(self, *args): + """ Add a job to be sent to the workers. """ + self.queue.put(args) + + +def _threadpool_worker(queue, job_handler): + """ Provides the base functionality of a worker for the thread pool. """ + while True: + job = queue.get() + job_handler(job) + queue.task_done()