This commit is contained in:
Paulus Schoutsen
2016-05-14 15:07:20 -07:00
parent 54ecab7590
commit 1096232e17
4 changed files with 88 additions and 39 deletions

View File

@@ -73,51 +73,70 @@ class APIEventStream(HomeAssistantView):
def get(self, request):
"""Provide a streaming interface for the event bus."""
from eventlet import Queue
import eventlet
from eventlet import queue as eventlet_queue
import queue as thread_queue
from threading import Event
from time import time
queue = Queue()
to_write = thread_queue.Queue()
# to_write = eventlet.Queue()
stop_obj = object()
hass = self.hass
connection_closed = Event()
restrict = request.args.get('restrict')
if restrict:
restrict = restrict.split(',')
restrict = False
def ping(now):
"""Add a ping message to queue."""
queue.put(STREAM_PING_PAYLOAD)
print(id(stop_obj), 'ping')
to_write.put(STREAM_PING_PAYLOAD)
def forward_events(event):
"""Forward events to the open request."""
print(id(stop_obj), 'forwarding', event)
if event.event_type == EVENT_TIME_CHANGED:
pass
elif event.event_type == EVENT_HOMEASSISTANT_STOP:
queue.put(stop_obj)
to_write.put(stop_obj)
else:
queue.put(json.dumps(event, cls=rem.JSONEncoder))
to_write.put(json.dumps(event, cls=rem.JSONEncoder))
def stream():
"""Stream events to response."""
if restrict:
for event in restrict:
hass.bus.listen(event, forward_events)
for event_type in restrict:
hass.bus.listen(event_type, forward_events)
else:
hass.bus.listen(MATCH_ALL, forward_events)
attached_ping = track_utc_time_change(hass, ping, second=(0, 30))
attached_ping = track_utc_time_change(
hass, ping, second=(0, 30))
try:
while True:
payload = queue.get()
print(id(stop_obj), 'attached goodness')
while not connection_closed.is_set():
try:
print(id(stop_obj), "Try getting obj")
payload = to_write.get(False)
if payload is stop_obj:
break
msg = "data: {}\n\n".format(payload)
print(id(stop_obj), msg)
yield msg.encode("UTF-8")
except GeneratorExit:
pass
except eventlet_queue.Empty:
print(id(stop_obj), "queue empty, sleep 0.5")
eventlet.sleep(.5)
except GeneratorExit:
pass
print(id(stop_obj), "cleaning up")
hass.bus.remove_listener(EVENT_TIME_CHANGED, attached_ping)
@@ -127,7 +146,29 @@ class APIEventStream(HomeAssistantView):
else:
hass.bus.remove_listener(MATCH_ALL, forward_events)
return self.Response(stream(), mimetype='text/event-stream')
resp = self.Response(stream(), mimetype='text/event-stream')
def closing():
print()
print()
print()
print()
print()
print()
print()
print()
print(id(stop_obj), "CLOSING RESPONSE")
print()
print()
print()
print()
print()
print()
print()
connection_closed.set()
resp.call_on_close(closing)
return resp
class APIConfigView(HomeAssistantView):