ThreadPool now uses a Priority instead of FIFO queue

This commit is contained in:
Paulus Schoutsen 2014-02-02 21:42:57 -08:00
parent 1d2d7bb7a0
commit 9bfc3e4357
2 changed files with 20 additions and 8 deletions

View File

@ -32,7 +32,10 @@ assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!"
BUS_NUM_THREAD = 4
BUS_REPORT_BUSY_TIMEOUT = dt.timedelta(minutes=1)
PRIO_SERVICE_DEFAULT = 1
PRIO_EVENT_STATE = 2
PRIO_EVENT_TIME = 3
PRIO_EVENT_DEFAULT = 4
def start_home_assistant(bus):
""" Start home assistant. """
@ -197,6 +200,7 @@ class Bus(object):
and events.
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, thread_count=None):
self.thread_count = thread_count or BUS_NUM_THREAD
self._event_listeners = {}
@ -238,8 +242,9 @@ class Bus(object):
with self.service_lock:
try:
self.pool.add_job(self._services[domain][service],
service_call)
self.pool.add_job(PRIO_SERVICE_DEFAULT,
(self._services[domain][service],
service_call))
self._check_busy()
@ -272,8 +277,15 @@ class Bus(object):
if not listeners:
return
if event_type == EVENT_TIME_CHANGED:
prio = PRIO_EVENT_TIME
elif event_type == EVENT_STATE_CHANGED:
prio = PRIO_EVENT_STATE
else:
prio = PRIO_EVENT_DEFAULT
for func in listeners:
self.pool.add_job(func, event)
self.pool.add_job(prio, (func, event))
self._check_busy()

View File

@ -88,7 +88,7 @@ class ThreadPool(object):
# pylint: disable=too-few-public-methods
def __init__(self, worker_count, job_handler):
queue = self.queue = Queue.Queue()
queue = self.queue = Queue.PriorityQueue()
current_jobs = self.current_jobs = []
for _ in xrange(worker_count):
@ -97,16 +97,16 @@ class ThreadPool(object):
worker.daemon = True
worker.start()
def add_job(self, *args):
def add_job(self, priority, job):
""" Add a job to be sent to the workers. """
self.queue.put(args)
self.queue.put((priority, job))
def _threadpool_worker(queue, current_jobs, job_handler):
""" Provides the base functionality of a worker for the thread pool. """
while True:
# Get new item from queue
job = queue.get()
job = queue.get()[1]
# Add to current running jobs
job_log = (datetime.datetime.now(), job)