From 548d415f94c8a180a43ef93b2c07bfba47c3903a Mon Sep 17 00:00:00 2001 From: Paulus Schoutsen Date: Mon, 16 May 2016 19:47:15 -0700 Subject: [PATCH] Clean up EventStream --- homeassistant/components/api.py | 46 +++++++++++---------------------- homeassistant/util/eventlet.py | 9 ------- 2 files changed, 15 insertions(+), 40 deletions(-) delete mode 100644 homeassistant/util/eventlet.py diff --git a/homeassistant/components/api.py b/homeassistant/components/api.py index 3819536f666..0296d02ad83 100644 --- a/homeassistant/components/api.py +++ b/homeassistant/components/api.py @@ -6,6 +6,7 @@ https://home-assistant.io/developers/api/ """ import json import logging +from time import time import homeassistant.core as ha import homeassistant.remote as rem @@ -21,7 +22,6 @@ from homeassistant.const import ( from homeassistant.exceptions import TemplateError from homeassistant.helpers.state import TrackStates from homeassistant.helpers import template -from homeassistant.helpers.event import track_utc_time_change from homeassistant.components.http import HomeAssistantView DOMAIN = 'api' @@ -65,33 +65,25 @@ class APIStatusView(HomeAssistantView): class APIEventStream(HomeAssistantView): - """View to handle EventSt requests.""" + """View to handle EventStream requests.""" url = URL_API_STREAM name = "api:stream" def get(self, request): """Provide a streaming interface for the event bus.""" - from eventlet.queue import Empty + from eventlet.queue import LightQueue, Empty import eventlet - import homeassistant.util.eventlet as eventlet_util cur_hub = eventlet.hubs.get_hub() request.environ['eventlet.minimum_write_chunk_size'] = 0 - - to_write = eventlet.Queue() + to_write = LightQueue() stop_obj = object() - attached_ping = None restrict = request.args.get('restrict') if restrict: restrict = restrict.split(',') - def thread_ping(now): - """Called from time thread to add ping to queue.""" - _LOGGER.debug('STREAM %s PING', id(stop_obj)) - eventlet_util.spawn(cur_hub, to_write.put, STREAM_PING_PAYLOAD) - def thread_forward_events(event): """Forward events to the open request.""" if event.event_type == EVENT_TIME_CHANGED: @@ -104,34 +96,20 @@ class APIEventStream(HomeAssistantView): else: data = json.dumps(event, cls=rem.JSONEncoder) - eventlet_util.spawn(cur_hub, to_write.put, data) - - def cleanup(): - """Clean up HA listeners.""" - _LOGGER.debug("STREAM %s CLEANING UP", id(stop_obj)) - self.hass.bus.remove_listener(EVENT_TIME_CHANGED, attached_ping) - - if restrict: - for event in restrict: - self.hass.bus.remove_listener(event, thread_forward_events) - else: - self.hass.bus.remove_listener(MATCH_ALL, thread_forward_events) + cur_hub.schedule_call_global(0, lambda: to_write.put(data)) def stream(): """Stream events to response.""" - nonlocal attached_ping - if restrict: for event_type in restrict: self.hass.bus.listen(event_type, thread_forward_events) else: self.hass.bus.listen(MATCH_ALL, thread_forward_events) - attached_ping = track_utc_time_change( - self.hass, thread_ping, second=(0, 30)) - _LOGGER.debug('STREAM %s ATTACHED', id(stop_obj)) + last_msg = time() + while True: try: # Somehow our queue.get takes too long to @@ -151,13 +129,19 @@ class APIEventStream(HomeAssistantView): _LOGGER.debug('STREAM %s WRITING %s', id(stop_obj), msg.strip()) yield msg.encode("UTF-8") + last_msg = time() except Empty: - pass + if time() - last_msg > 50: + to_write.put(STREAM_PING_PAYLOAD) except GeneratorExit: _LOGGER.debug('STREAM %s RESPONSE CLOSED', id(stop_obj)) break - cleanup() + if restrict: + for event in restrict: + self.hass.bus.remove_listener(event, thread_forward_events) + else: + self.hass.bus.remove_listener(MATCH_ALL, thread_forward_events) return self.Response(stream(), mimetype='text/event-stream') diff --git a/homeassistant/util/eventlet.py b/homeassistant/util/eventlet.py deleted file mode 100644 index 6338935b0c9..00000000000 --- a/homeassistant/util/eventlet.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Eventlet util methods.""" - - -def spawn(hub, func, *args, **kwargs): - """Spawn a function on specified hub.""" - import eventlet - gthread = eventlet.greenthread.GreenThread(hub.greenlet) - hub.schedule_call_global(0, gthread.switch, func, args, kwargs) - return gthread