Allow circular dependency with discovery (#2616)

This commit is contained in:
Paulus Schoutsen
2016-07-25 22:49:10 -07:00
committed by GitHub
parent 9c76b30e24
commit f1632496f0
10 changed files with 157 additions and 60 deletions

View File

@@ -14,6 +14,7 @@ import threading
import time
from types import MappingProxyType
from typing import Any, Callable
import voluptuous as vol
import homeassistant.helpers.temperature as temp_helper
@@ -62,6 +63,29 @@ class CoreState(enum.Enum):
return self.value
class JobPriority(util.OrderedEnum):
"""Provides job priorities for event bus jobs."""
EVENT_CALLBACK = 0
EVENT_SERVICE = 1
EVENT_STATE = 2
EVENT_TIME = 3
EVENT_DEFAULT = 4
@staticmethod
def from_event_type(event_type):
"""Return a priority based on event type."""
if event_type == EVENT_TIME_CHANGED:
return JobPriority.EVENT_TIME
elif event_type == EVENT_STATE_CHANGED:
return JobPriority.EVENT_STATE
elif event_type == EVENT_CALL_SERVICE:
return JobPriority.EVENT_SERVICE
elif event_type == EVENT_SERVICE_EXECUTED:
return JobPriority.EVENT_CALLBACK
return JobPriority.EVENT_DEFAULT
class HomeAssistant(object):
"""Root object of the Home Assistant home automation."""
@@ -69,7 +93,7 @@ class HomeAssistant(object):
"""Initialize new Home Assistant object."""
self.pool = pool = create_worker_pool()
self.bus = EventBus(pool)
self.services = ServiceRegistry(self.bus, pool)
self.services = ServiceRegistry(self.bus, self.add_job)
self.states = StateMachine(self.bus)
self.config = Config()
self.state = CoreState.not_running
@@ -90,6 +114,17 @@ class HomeAssistant(object):
self.pool.block_till_done()
self.state = CoreState.running
def add_job(self,
target: Callable[..., None],
*args: Any,
priority: JobPriority=JobPriority.EVENT_DEFAULT) -> None:
"""Add job to the worker pool.
target: target to call.
args: parameters for method to call.
"""
self.pool.add_job(priority, (target,) + args)
def block_till_stopped(self) -> int:
"""Register service homeassistant/stop and will block until called."""
request_shutdown = threading.Event()
@@ -141,30 +176,6 @@ class HomeAssistant(object):
self.state = CoreState.not_running
class JobPriority(util.OrderedEnum):
"""Provides job priorities for event bus jobs."""
EVENT_CALLBACK = 0
EVENT_SERVICE = 1
EVENT_STATE = 2
EVENT_TIME = 3
EVENT_DEFAULT = 4
@staticmethod
def from_event_type(event_type):
"""Return a priority based on event type."""
if event_type == EVENT_TIME_CHANGED:
return JobPriority.EVENT_TIME
elif event_type == EVENT_STATE_CHANGED:
return JobPriority.EVENT_STATE
elif event_type == EVENT_CALL_SERVICE:
return JobPriority.EVENT_SERVICE
elif event_type == EVENT_SERVICE_EXECUTED:
return JobPriority.EVENT_CALLBACK
else:
return JobPriority.EVENT_DEFAULT
class EventOrigin(enum.Enum):
"""Represent the origin of an event."""
@@ -222,11 +233,11 @@ class Event(object):
class EventBus(object):
"""Allows firing of and listening for events."""
def __init__(self, pool=None):
def __init__(self, pool: util.ThreadPool):
"""Initialize a new event bus."""
self._listeners = {}
self._lock = threading.Lock()
self._pool = pool or create_worker_pool()
self._pool = pool
@property
def listeners(self):
@@ -235,7 +246,7 @@ class EventBus(object):
return {key: len(self._listeners[key])
for key in self._listeners}
def fire(self, event_type, event_data=None, origin=EventOrigin.local):
def fire(self, event_type: str, event_data=None, origin=EventOrigin.local):
"""Fire an event."""
if not self._pool.running:
raise HomeAssistantError('Home Assistant has shut down.')
@@ -575,11 +586,11 @@ class ServiceCall(object):
class ServiceRegistry(object):
"""Offers services over the eventbus."""
def __init__(self, bus, pool=None):
def __init__(self, bus, add_job):
"""Initialize a service registry."""
self._services = {}
self._lock = threading.Lock()
self._pool = pool or create_worker_pool()
self._add_job = add_job
self._bus = bus
self._cur_id = 0
bus.listen(EVENT_CALL_SERVICE, self._event_to_service_call)
@@ -678,13 +689,11 @@ class ServiceRegistry(object):
service_call = ServiceCall(domain, service, service_data, call_id)
# Add a job to the pool that calls _execute_service
self._pool.add_job(JobPriority.EVENT_SERVICE,
(self._execute_service,
(service_handler, service_call)))
self._add_job(self._execute_service, service_handler, service_call,
priority=JobPriority.EVENT_SERVICE)
def _execute_service(self, service_and_call):
def _execute_service(self, service, call):
"""Execute a service and fires a SERVICE_EXECUTED event."""
service, call = service_and_call
service(call)
if call.call_id is not None:
@@ -831,8 +840,8 @@ def create_worker_pool(worker_count=None):
def job_handler(job):
"""Called whenever a job is available to do."""
try:
func, arg = job
func(arg)
func, *args = job
func(*args)
except Exception: # pylint: disable=broad-except
# Catch any exception our service/event_listener might throw
# We do not want to crash our ThreadPool