From 9bfc3e4357a45e63cb2bd80cc0059efc67dc5e93 Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Sun, 2 Feb 2014 21:42:57 -0800 Subject: [PATCH] ThreadPool now uses a Priority instead of FIFO queue --- homeassistant/__init__.py | 20 ++++++++++++++++---- homeassistant/util.py | 8 ++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/homeassistant/__init__.py b/homeassistant/__init__.py index 41f4427f8f1..2ac9529df04 100644 --- a/homeassistant/__init__.py +++ b/homeassistant/__init__.py @@ -32,7 +32,10 @@ assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!" BUS_NUM_THREAD = 4 BUS_REPORT_BUSY_TIMEOUT = dt.timedelta(minutes=1) - +PRIO_SERVICE_DEFAULT = 1 +PRIO_EVENT_STATE = 2 +PRIO_EVENT_TIME = 3 +PRIO_EVENT_DEFAULT = 4 def start_home_assistant(bus): """ Start home assistant. """ @@ -197,6 +200,7 @@ class Bus(object): and events. """ + # pylint: disable=too-many-instance-attributes def __init__(self, thread_count=None): self.thread_count = thread_count or BUS_NUM_THREAD self._event_listeners = {} @@ -238,8 +242,9 @@ class Bus(object): with self.service_lock: try: - self.pool.add_job(self._services[domain][service], - service_call) + self.pool.add_job(PRIO_SERVICE_DEFAULT, + (self._services[domain][service], + service_call)) self._check_busy() @@ -272,8 +277,15 @@ class Bus(object): if not listeners: return + if event_type == EVENT_TIME_CHANGED: + prio = PRIO_EVENT_TIME + elif event_type == EVENT_STATE_CHANGED: + prio = PRIO_EVENT_STATE + else: + prio = PRIO_EVENT_DEFAULT + for func in listeners: - self.pool.add_job(func, event) + self.pool.add_job(prio, (func, event)) self._check_busy() diff --git a/homeassistant/util.py b/homeassistant/util.py index 86b72724e80..3298eb60dd7 100644 --- a/homeassistant/util.py +++ b/homeassistant/util.py @@ -88,7 +88,7 @@ class ThreadPool(object): # pylint: disable=too-few-public-methods def __init__(self, worker_count, job_handler): - queue = self.queue = Queue.Queue() + queue = self.queue = Queue.PriorityQueue() current_jobs = self.current_jobs = [] for _ in xrange(worker_count): @@ -97,16 +97,16 @@ class ThreadPool(object): worker.daemon = True worker.start() - def add_job(self, *args): + def add_job(self, priority, job): """ Add a job to be sent to the workers. """ - self.queue.put(args) + self.queue.put((priority, job)) def _threadpool_worker(queue, current_jobs, job_handler): """ Provides the base functionality of a worker for the thread pool. """ while True: # Get new item from queue - job = queue.get() + job = queue.get()[1] # Add to current running jobs job_log = (datetime.datetime.now(), job)