Update nest stream URLs expiration (#46311)

This commit is contained in:
Allen Porter 2021-02-09 23:53:34 -08:00 committed by GitHub
parent 00aebec90d
commit 26f455223b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 12 deletions

View File

@ -146,13 +146,9 @@ class NestCamera(Camera):
# Next attempt to catch a url will get a new one # Next attempt to catch a url will get a new one
self._stream = None self._stream = None
return return
# Stop any existing stream worker since the url is invalid. The next # Update the stream worker with the latest valid url
# request for this stream will restart it with the right url.
# Issue #42793 tracks improvements (e.g. preserve keepalive, smoother
# transitions across streams)
if self.stream: if self.stream:
self.stream.stop() self.stream.update_source(self._stream.rtsp_stream_url)
self.stream = None
self._schedule_stream_refresh() self._schedule_stream_refresh()
async def async_will_remove_from_hass(self): async def async_will_remove_from_hass(self):

View File

@ -109,8 +109,9 @@ class Stream:
self.keepalive = False self.keepalive = False
self.access_token = None self.access_token = None
self._thread = None self._thread = None
self._thread_quit = None self._thread_quit = threading.Event()
self._outputs = {} self._outputs = {}
self._fast_restart_once = False
if self.options is None: if self.options is None:
self.options = {} self.options = {}
@ -167,7 +168,7 @@ class Stream:
# The thread must have crashed/exited. Join to clean up the # The thread must have crashed/exited. Join to clean up the
# previous thread. # previous thread.
self._thread.join(timeout=0) self._thread.join(timeout=0)
self._thread_quit = threading.Event() self._thread_quit.clear()
self._thread = threading.Thread( self._thread = threading.Thread(
name="stream_worker", name="stream_worker",
target=self._run_worker, target=self._run_worker,
@ -175,6 +176,13 @@ class Stream:
self._thread.start() self._thread.start()
_LOGGER.info("Started stream: %s", self.source) _LOGGER.info("Started stream: %s", self.source)
def update_source(self, new_source):
"""Restart the stream with a new stream source."""
_LOGGER.debug("Updating stream source %s", self.source)
self.source = new_source
self._fast_restart_once = True
self._thread_quit.set()
def _run_worker(self): def _run_worker(self):
"""Handle consuming streams and restart keepalive streams.""" """Handle consuming streams and restart keepalive streams."""
# Keep import here so that we can import stream integration without installing reqs # Keep import here so that we can import stream integration without installing reqs
@ -186,8 +194,12 @@ class Stream:
start_time = time.time() start_time = time.time()
stream_worker(self.hass, self, self._thread_quit) stream_worker(self.hass, self, self._thread_quit)
if not self.keepalive or self._thread_quit.is_set(): if not self.keepalive or self._thread_quit.is_set():
if self._fast_restart_once:
# The stream source is updated, restart without any delay.
self._fast_restart_once = False
self._thread_quit.clear()
continue
break break
# To avoid excessive restarts, wait before restarting # To avoid excessive restarts, wait before restarting
# As the required recovery time may be different for different setups, start # As the required recovery time may be different for different setups, start
# with trying a short wait_timeout and increase it on each reconnection attempt. # with trying a short wait_timeout and increase it on each reconnection attempt.

View File

@ -14,6 +14,7 @@ failure modes or corner cases like how out of order packets are handled.
""" """
import fractions import fractions
import io
import math import math
import threading import threading
from unittest.mock import patch from unittest.mock import patch
@ -44,6 +45,7 @@ LONGER_TEST_SEQUENCE_LENGTH = 20 * VIDEO_FRAME_RATE
OUT_OF_ORDER_PACKET_INDEX = 3 * VIDEO_FRAME_RATE OUT_OF_ORDER_PACKET_INDEX = 3 * VIDEO_FRAME_RATE
PACKETS_PER_SEGMENT = SEGMENT_DURATION / PACKET_DURATION PACKETS_PER_SEGMENT = SEGMENT_DURATION / PACKET_DURATION
SEGMENTS_PER_PACKET = PACKET_DURATION / SEGMENT_DURATION SEGMENTS_PER_PACKET = PACKET_DURATION / SEGMENT_DURATION
TIMEOUT = 15
class FakePyAvStream: class FakePyAvStream:
@ -178,9 +180,9 @@ class MockPyAv:
def open(self, stream_source, *args, **kwargs): def open(self, stream_source, *args, **kwargs):
"""Return a stream or buffer depending on args.""" """Return a stream or buffer depending on args."""
if stream_source == STREAM_SOURCE: if isinstance(stream_source, io.BytesIO):
return self.container
return self.capture_buffer return self.capture_buffer
return self.container
async def async_decode_stream(hass, packets, py_av=None): async def async_decode_stream(hass, packets, py_av=None):
@ -469,3 +471,77 @@ async def test_pts_out_of_order(hass):
assert all([s.duration == SEGMENT_DURATION for s in segments]) assert all([s.duration == SEGMENT_DURATION for s in segments])
assert len(decoded_stream.video_packets) == len(packets) assert len(decoded_stream.video_packets) == len(packets)
assert len(decoded_stream.audio_packets) == 0 assert len(decoded_stream.audio_packets) == 0
async def test_stream_stopped_while_decoding(hass):
"""Tests that worker quits when stop() is called while decodign."""
# Add some synchronization so that the test can pause the background
# worker. When the worker is stopped, the test invokes stop() which
# will cause the worker thread to exit once it enters the decode
# loop
worker_open = threading.Event()
worker_wake = threading.Event()
stream = Stream(hass, STREAM_SOURCE)
stream.add_provider(STREAM_OUTPUT_FORMAT)
py_av = MockPyAv()
py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH)
def blocking_open(stream_source, *args, **kwargs):
# Let test know the thread is running
worker_open.set()
# Block worker thread until test wakes up
worker_wake.wait()
return py_av.open(stream_source, args, kwargs)
with patch("av.open", new=blocking_open):
stream.start()
assert worker_open.wait(TIMEOUT)
# Note: There is a race here where the worker could start as soon
# as the wake event is sent, completing all decode work.
worker_wake.set()
stream.stop()
async def test_update_stream_source(hass):
"""Tests that the worker is re-invoked when the stream source is updated."""
worker_open = threading.Event()
worker_wake = threading.Event()
stream = Stream(hass, STREAM_SOURCE)
stream.add_provider(STREAM_OUTPUT_FORMAT)
# Note that keepalive is not set here. The stream is "restarted" even though
# it is not stopping due to failure.
py_av = MockPyAv()
py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH)
last_stream_source = None
def blocking_open(stream_source, *args, **kwargs):
nonlocal last_stream_source
if not isinstance(stream_source, io.BytesIO):
last_stream_source = stream_source
# Let test know the thread is running
worker_open.set()
# Block worker thread until test wakes up
worker_wake.wait()
return py_av.open(stream_source, args, kwargs)
with patch("av.open", new=blocking_open):
stream.start()
assert worker_open.wait(TIMEOUT)
assert last_stream_source == STREAM_SOURCE
# Update the stream source, then the test wakes up the worker and assert
# that it re-opens the new stream (the test again waits on thread_started)
worker_open.clear()
stream.update_source(STREAM_SOURCE + "-updated-source")
worker_wake.set()
assert worker_open.wait(TIMEOUT)
assert last_stream_source == STREAM_SOURCE + "-updated-source"
worker_wake.set()
# Ccleanup
stream.stop()