Asyncio event helpers (#3415)

* Automation - Event: Use coroutine

* Convert event helpers to coroutine

* Fix linting

* Add hass.async_add_job

* Automation - Event to use async_add_job
This commit is contained in:
Paulus Schoutsen 2016-09-17 18:28:01 -07:00 committed by GitHub
parent 4076ccf639
commit aca375c312
3 changed files with 31 additions and 10 deletions

View File

@ -4,6 +4,7 @@ Offer event listening automation rules.
For more details about this automation rule, please refer to the documentation For more details about this automation rule, please refer to the documentation
at https://home-assistant.io/components/automation/#event-trigger at https://home-assistant.io/components/automation/#event-trigger
""" """
import asyncio
import logging import logging
import voluptuous as vol import voluptuous as vol
@ -28,11 +29,12 @@ def trigger(hass, config, action):
event_type = config.get(CONF_EVENT_TYPE) event_type = config.get(CONF_EVENT_TYPE)
event_data = config.get(CONF_EVENT_DATA) event_data = config.get(CONF_EVENT_DATA)
@asyncio.coroutine
def handle_event(event): def handle_event(event):
"""Listen for events and calls the action when data matches.""" """Listen for events and calls the action when data matches."""
if not event_data or all(val == event.data.get(key) for key, val if not event_data or all(val == event.data.get(key) for key, val
in event_data.items()): in event_data.items()):
action({ hass.async_add_job(action, {
'trigger': { 'trigger': {
'platform': 'event', 'platform': 'event',
'event': event, 'event': event,

View File

@ -6,6 +6,7 @@ of entities and react to changes.
""" """
# pylint: disable=unused-import, too-many-lines # pylint: disable=unused-import, too-many-lines
import asyncio import asyncio
from concurrent.futures import ThreadPoolExecutor
import enum import enum
import functools as ft import functools as ft
import logging import logging
@ -13,8 +14,8 @@ import os
import re import re
import signal import signal
import sys import sys
import threading
import time import time
from concurrent.futures import ThreadPoolExecutor
from types import MappingProxyType from types import MappingProxyType
@ -205,6 +206,17 @@ class HomeAssistant(object):
""" """
self.pool.add_job(priority, (target,) + args) self.pool.add_job(priority, (target,) + args)
def async_add_job(self, target: Callable[..., None], *args: Any):
"""Add a job from within the eventloop.
target: target to call.
args: parameters for method to call.
"""
if asyncio.iscoroutinefunction(target):
self.loop.create_task(target(*args))
else:
self.add_job(target, *args)
def _loop_empty(self): def _loop_empty(self):
"""Python 3.4.2 empty loop compatibility function.""" """Python 3.4.2 empty loop compatibility function."""
# pylint: disable=protected-access # pylint: disable=protected-access
@ -217,8 +229,6 @@ class HomeAssistant(object):
def block_till_done(self): def block_till_done(self):
"""Block till all pending work is done.""" """Block till all pending work is done."""
import threading
complete = threading.Event() complete = threading.Event()
@asyncio.coroutine @asyncio.coroutine

View File

@ -1,4 +1,5 @@
"""Helpers for listening to events.""" """Helpers for listening to events."""
import asyncio
import functools as ft import functools as ft
from datetime import timedelta from datetime import timedelta
@ -28,6 +29,7 @@ def track_state_change(hass, entity_ids, action, from_state=None,
entity_ids = tuple(entity_id.lower() for entity_id in entity_ids) entity_ids = tuple(entity_id.lower() for entity_id in entity_ids)
@ft.wraps(action) @ft.wraps(action)
@asyncio.coroutine
def state_change_listener(event): def state_change_listener(event):
"""The listener that listens for specific state changes.""" """The listener that listens for specific state changes."""
if entity_ids != MATCH_ALL and \ if entity_ids != MATCH_ALL and \
@ -45,7 +47,7 @@ def track_state_change(hass, entity_ids, action, from_state=None,
new_state = None new_state = None
if _matcher(old_state, from_state) and _matcher(new_state, to_state): if _matcher(old_state, from_state) and _matcher(new_state, to_state):
action(event.data.get('entity_id'), hass.async_add_job(action, event.data.get('entity_id'),
event.data.get('old_state'), event.data.get('old_state'),
event.data.get('new_state')) event.data.get('new_state'))
@ -70,6 +72,7 @@ def track_point_in_utc_time(hass, action, point_in_time):
point_in_time = dt_util.as_utc(point_in_time) point_in_time = dt_util.as_utc(point_in_time)
@ft.wraps(action) @ft.wraps(action)
@asyncio.coroutine
def point_in_time_listener(event): def point_in_time_listener(event):
"""Listen for matching time_changed events.""" """Listen for matching time_changed events."""
now = event.data[ATTR_NOW] now = event.data[ATTR_NOW]
@ -83,9 +86,14 @@ def track_point_in_utc_time(hass, action, point_in_time):
# listener gets lined up twice to be executed. This will make # listener gets lined up twice to be executed. This will make
# sure the second time it does nothing. # sure the second time it does nothing.
point_in_time_listener.run = True point_in_time_listener.run = True
def fire_action():
"""Run the point in time listener action."""
remove() remove()
action(now) action(now)
hass.add_job(fire_action)
remove = hass.bus.listen(EVENT_TIME_CHANGED, point_in_time_listener) remove = hass.bus.listen(EVENT_TIME_CHANGED, point_in_time_listener)
return remove return remove
@ -171,6 +179,7 @@ def track_utc_time_change(hass, action, year=None, month=None, day=None,
hour, minute, second = pmp(hour), pmp(minute), pmp(second) hour, minute, second = pmp(hour), pmp(minute), pmp(second)
@ft.wraps(action) @ft.wraps(action)
@asyncio.coroutine
def pattern_time_change_listener(event): def pattern_time_change_listener(event):
"""Listen for matching time_changed events.""" """Listen for matching time_changed events."""
now = event.data[ATTR_NOW] now = event.data[ATTR_NOW]
@ -187,7 +196,7 @@ def track_utc_time_change(hass, action, year=None, month=None, day=None,
mat(now.minute, minute) and \ mat(now.minute, minute) and \
mat(now.second, second): mat(now.second, second):
action(now) hass.async_add_job(action, now)
return hass.bus.listen(EVENT_TIME_CHANGED, pattern_time_change_listener) return hass.bus.listen(EVENT_TIME_CHANGED, pattern_time_change_listener)