Complain if too much jobs queued.

This commit is contained in:
Paulus Schoutsen 2014-01-29 22:48:35 -08:00
parent c438f35bcd
commit e586d8debc
2 changed files with 58 additions and 9 deletions

View File

@ -31,6 +31,7 @@ TIMER_INTERVAL = 10 # seconds
assert 60 % TIMER_INTERVAL == 0, "60 % TIMER_INTERVAL should be 0!"
BUS_NUM_THREAD = 4
BUS_REPORT_BUSY_TIMEOUT = dt.timedelta(minutes=1)
def start_home_assistant(bus):
@ -75,7 +76,7 @@ def track_state_change(bus, entity_id, action, from_state=None, to_state=None):
from_state = _process_match_param(from_state)
to_state = _process_match_param(to_state)
def listener(event):
def state_listener(event):
""" State change listener that listens for specific state changes. """
if entity_id == event.data['entity_id'] and \
_matcher(event.data['old_state'].state, from_state) and \
@ -85,7 +86,10 @@ def track_state_change(bus, entity_id, action, from_state=None, to_state=None):
event.data['old_state'],
event.data['new_state'])
bus.listen_event(EVENT_STATE_CHANGED, listener)
# Let the string representation make sense
state_listener.__name__ = 'State listener for {}'.format(action.__name__)
bus.listen_event(EVENT_STATE_CHANGED, state_listener)
# pylint: disable=too-many-arguments
@ -98,7 +102,7 @@ def track_time_change(bus, action,
year, month, day = pmp(year), pmp(month), pmp(day)
hour, minute, second = pmp(hour), pmp(minute), pmp(second)
def listener(event):
def time_listener(event):
""" Listens for matching time_changed events. """
now = event.data['now']
@ -116,11 +120,14 @@ def track_time_change(bus, action,
# point_in_time are exact points in time
# so we always remove it after fire
if listen_once or point_in_time:
bus.remove_event_listener(EVENT_TIME_CHANGED, listener)
bus.remove_event_listener(EVENT_TIME_CHANGED, time_listener)
action(now)
bus.listen_event(EVENT_TIME_CHANGED, listener)
# Let the string representation make sense
time_listener.__name__ = 'Time listener for {}'.format(action.__name__)
bus.listen_event(EVENT_TIME_CHANGED, time_listener)
def create_bus_job_handler(logger):
@ -182,13 +189,15 @@ class Bus(object):
"""
def __init__(self, thread_count=None):
thread_count = thread_count or BUS_NUM_THREAD
self.thread_count = thread_count or BUS_NUM_THREAD
self._event_listeners = {}
self._services = {}
self.logger = logging.getLogger(__name__)
self.event_lock = threading.Lock()
self.service_lock = threading.Lock()
self.pool = util.ThreadPool(thread_count,
self.last_busy_notice = dt.datetime.now()
self.pool = util.ThreadPool(self.thread_count,
create_bus_job_handler(self.logger))
@property
@ -223,6 +232,8 @@ class Bus(object):
self.pool.add_job(self._services[domain][service],
service_call)
self._check_busy()
except KeyError: # if key domain or service does not exist
raise ServiceDoesNotExistError(
"Service does not exist: {}/{}".format(domain, service))
@ -255,6 +266,8 @@ class Bus(object):
for func in listeners:
self.pool.add_job(func, event)
self._check_busy()
def listen_event(self, event_type, listener):
""" Listen for all events or events of a specific type.
@ -283,6 +296,9 @@ class Bus(object):
listener(event)
onetime_listener.__name__ = "One time listener for {}".format(
listener.__name__)
self.listen_event(event_type, onetime_listener)
def remove_event_listener(self, event_type, listener):
@ -300,6 +316,26 @@ class Bus(object):
# AttributeError if listener did not exist within event_type
pass
def _check_busy(self):
""" Complain if we have more than twice as many jobs queued as threads
and if we didn't complain about it recently. """
if self.pool.queue.qsize() / self.thread_count >= 2 and \
dt.datetime.now()-self.last_busy_notice > BUS_REPORT_BUSY_TIMEOUT:
self.last_busy_notice = dt.datetime.now()
logger = self.logger
logger.error(
"Bus:All {} threads are busy and {} jobs pending".format(
self.thread_count, self.pool.queue.qsize()))
jobs = self.pool.current_jobs
for start, job in jobs:
logger.error("Bus:Current job from {}: {}".format(
util.datetime_to_str(start), job))
class State(object):
""" Object to represent a state within the state machine. """

View File

@ -89,10 +89,11 @@ class ThreadPool(object):
# pylint: disable=too-few-public-methods
def __init__(self, worker_count, job_handler):
queue = self.queue = Queue.Queue()
current_jobs = self.current_jobs = []
for _ in xrange(worker_count):
worker = threading.Thread(target=_threadpool_worker,
args=(queue, job_handler))
args=(queue, current_jobs, job_handler))
worker.daemon = True
worker.start()
@ -101,9 +102,21 @@ class ThreadPool(object):
self.queue.put(args)
def _threadpool_worker(queue, job_handler):
def _threadpool_worker(queue, current_jobs, job_handler):
""" Provides the base functionality of a worker for the thread pool. """
while True:
# Get new item from queue
job = queue.get()
# Add to current running jobs
job_log = (datetime.datetime.now(), job)
current_jobs.append(job_log)
# Do the job
job_handler(job)
# Remove from current running job
current_jobs.remove(job_log)
# Tell queue a task is done
queue.task_done()