Clean up EventStream

This commit is contained in:
Paulus Schoutsen 2016-05-16 19:47:15 -07:00
parent 18be276b08
commit 548d415f94
2 changed files with 15 additions and 40 deletions

View File

@ -6,6 +6,7 @@ https://home-assistant.io/developers/api/
""" """
import json import json
import logging import logging
from time import time
import homeassistant.core as ha import homeassistant.core as ha
import homeassistant.remote as rem import homeassistant.remote as rem
@ -21,7 +22,6 @@ from homeassistant.const import (
from homeassistant.exceptions import TemplateError from homeassistant.exceptions import TemplateError
from homeassistant.helpers.state import TrackStates from homeassistant.helpers.state import TrackStates
from homeassistant.helpers import template from homeassistant.helpers import template
from homeassistant.helpers.event import track_utc_time_change
from homeassistant.components.http import HomeAssistantView from homeassistant.components.http import HomeAssistantView
DOMAIN = 'api' DOMAIN = 'api'
@ -65,33 +65,25 @@ class APIStatusView(HomeAssistantView):
class APIEventStream(HomeAssistantView): class APIEventStream(HomeAssistantView):
"""View to handle EventSt requests.""" """View to handle EventStream requests."""
url = URL_API_STREAM url = URL_API_STREAM
name = "api:stream" name = "api:stream"
def get(self, request): def get(self, request):
"""Provide a streaming interface for the event bus.""" """Provide a streaming interface for the event bus."""
from eventlet.queue import Empty from eventlet.queue import LightQueue, Empty
import eventlet import eventlet
import homeassistant.util.eventlet as eventlet_util
cur_hub = eventlet.hubs.get_hub() cur_hub = eventlet.hubs.get_hub()
request.environ['eventlet.minimum_write_chunk_size'] = 0 request.environ['eventlet.minimum_write_chunk_size'] = 0
to_write = LightQueue()
to_write = eventlet.Queue()
stop_obj = object() stop_obj = object()
attached_ping = None
restrict = request.args.get('restrict') restrict = request.args.get('restrict')
if restrict: if restrict:
restrict = restrict.split(',') restrict = restrict.split(',')
def thread_ping(now):
"""Called from time thread to add ping to queue."""
_LOGGER.debug('STREAM %s PING', id(stop_obj))
eventlet_util.spawn(cur_hub, to_write.put, STREAM_PING_PAYLOAD)
def thread_forward_events(event): def thread_forward_events(event):
"""Forward events to the open request.""" """Forward events to the open request."""
if event.event_type == EVENT_TIME_CHANGED: if event.event_type == EVENT_TIME_CHANGED:
@ -104,34 +96,20 @@ class APIEventStream(HomeAssistantView):
else: else:
data = json.dumps(event, cls=rem.JSONEncoder) data = json.dumps(event, cls=rem.JSONEncoder)
eventlet_util.spawn(cur_hub, to_write.put, data) cur_hub.schedule_call_global(0, lambda: to_write.put(data))
def cleanup():
"""Clean up HA listeners."""
_LOGGER.debug("STREAM %s CLEANING UP", id(stop_obj))
self.hass.bus.remove_listener(EVENT_TIME_CHANGED, attached_ping)
if restrict:
for event in restrict:
self.hass.bus.remove_listener(event, thread_forward_events)
else:
self.hass.bus.remove_listener(MATCH_ALL, thread_forward_events)
def stream(): def stream():
"""Stream events to response.""" """Stream events to response."""
nonlocal attached_ping
if restrict: if restrict:
for event_type in restrict: for event_type in restrict:
self.hass.bus.listen(event_type, thread_forward_events) self.hass.bus.listen(event_type, thread_forward_events)
else: else:
self.hass.bus.listen(MATCH_ALL, thread_forward_events) self.hass.bus.listen(MATCH_ALL, thread_forward_events)
attached_ping = track_utc_time_change(
self.hass, thread_ping, second=(0, 30))
_LOGGER.debug('STREAM %s ATTACHED', id(stop_obj)) _LOGGER.debug('STREAM %s ATTACHED', id(stop_obj))
last_msg = time()
while True: while True:
try: try:
# Somehow our queue.get takes too long to # Somehow our queue.get takes too long to
@ -151,13 +129,19 @@ class APIEventStream(HomeAssistantView):
_LOGGER.debug('STREAM %s WRITING %s', id(stop_obj), _LOGGER.debug('STREAM %s WRITING %s', id(stop_obj),
msg.strip()) msg.strip())
yield msg.encode("UTF-8") yield msg.encode("UTF-8")
last_msg = time()
except Empty: except Empty:
pass if time() - last_msg > 50:
to_write.put(STREAM_PING_PAYLOAD)
except GeneratorExit: except GeneratorExit:
_LOGGER.debug('STREAM %s RESPONSE CLOSED', id(stop_obj)) _LOGGER.debug('STREAM %s RESPONSE CLOSED', id(stop_obj))
break break
cleanup() if restrict:
for event in restrict:
self.hass.bus.remove_listener(event, thread_forward_events)
else:
self.hass.bus.remove_listener(MATCH_ALL, thread_forward_events)
return self.Response(stream(), mimetype='text/event-stream') return self.Response(stream(), mimetype='text/event-stream')

View File

@ -1,9 +0,0 @@
"""Eventlet util methods."""
def spawn(hub, func, *args, **kwargs):
"""Spawn a function on specified hub."""
import eventlet
gthread = eventlet.greenthread.GreenThread(hub.greenlet)
hub.schedule_call_global(0, gthread.switch, func, args, kwargs)
return gthread