Mark camera unavailable when keepalive stream fails (#62294)

* Mark camera unavailable when keepalive stream fails

Add a listener in stream that notifies camera when the stream state has changed, and
use that to inform the camera `available` property. Update the property to be set
only from the main loop where it is read to reduce thread safety races.

Issue #54659

* Fix pylint import related errors

* Address lint naming errors

* Apply suggestions from code review

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
Allen Porter 2021-12-19 09:09:59 -08:00 committed by GitHub
parent 1ec8619687
commit 647febd7d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 9 deletions

View File

@ -447,6 +447,13 @@ class Camera(Entity):
return None
return STREAM_TYPE_HLS
@property
def available(self) -> bool:
"""Return True if entity is available."""
if self.stream and not self.stream.available:
return self.stream.available
return super().available
async def create_stream(self) -> Stream | None:
"""Create a Stream for stream_source."""
# There is at most one stream (a decode worker) per camera
@ -456,6 +463,7 @@ class Camera(Entity):
if not source:
return None
self.stream = create_stream(self.hass, source, options=self.stream_options)
self.stream.set_update_callback(self.async_write_ha_state)
return self.stream
async def stream_source(self) -> str | None:

View File

@ -16,7 +16,7 @@ to always keep workers active.
"""
from __future__ import annotations
from collections.abc import Mapping
from collections.abc import Callable, Mapping
import logging
import re
import secrets
@ -204,7 +204,8 @@ class Stream:
self._thread_quit = threading.Event()
self._outputs: dict[str, StreamOutput] = {}
self._fast_restart_once = False
self._available = True
self._available: bool = True
self._update_callback: Callable[[], None] | None = None
def endpoint_url(self, fmt: str) -> str:
"""Start the stream and returns a url for the output format."""
@ -260,6 +261,17 @@ class Stream:
"""Return False if the stream is started and known to be unavailable."""
return self._available
def set_update_callback(self, update_callback: Callable[[], None]) -> None:
"""Set callback to run when state changes."""
self._update_callback = update_callback
@callback
def _async_update_state(self, available: bool) -> None:
"""Set state and Run callback to notify state has been updated."""
self._available = available
if self._update_callback:
self._update_callback()
def start(self) -> None:
"""Start a stream."""
if self._thread is None or not self._thread.is_alive():
@ -292,8 +304,7 @@ class Stream:
wait_timeout = 0
while not self._thread_quit.wait(timeout=wait_timeout):
start_time = time.time()
self._available = True
self.hass.add_job(self._async_update_state, True)
try:
stream_worker(
self.source,
@ -303,7 +314,6 @@ class Stream:
)
except StreamWorkerError as err:
_LOGGER.error("Error from stream worker: %s", str(err))
self._available = False
stream_state.discontinuity()
if not self.keepalive or self._thread_quit.is_set():
@ -314,6 +324,7 @@ class Stream:
continue
break
self.hass.add_job(self._async_update_state, False)
# To avoid excessive restarts, wait before restarting
# As the required recovery time may be different for different setups, start
# with trying a short wait_timeout and increase it on each reconnection attempt.

View File

@ -16,7 +16,11 @@ from homeassistant.components.camera.const import (
from homeassistant.components.camera.prefs import CameraEntityPreferences
from homeassistant.components.websocket_api.const import TYPE_RESULT
from homeassistant.config import async_process_ha_core_config
from homeassistant.const import ATTR_ENTITY_ID, EVENT_HOMEASSISTANT_START
from homeassistant.const import (
ATTR_ENTITY_ID,
EVENT_HOMEASSISTANT_START,
STATE_UNAVAILABLE,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.setup import async_setup_component
@ -633,3 +637,56 @@ async def test_websocket_web_rtc_offer_invalid_stream_type(
assert response["type"] == TYPE_RESULT
assert not response["success"]
assert response["error"]["code"] == "web_rtc_offer_failed"
async def test_state_streaming(hass, hass_ws_client, mock_camera):
"""Camera state."""
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None
assert demo_camera.state == camera.STATE_STREAMING
async def test_stream_unavailable(hass, hass_ws_client, mock_camera, mock_stream):
"""Camera state."""
await async_setup_component(hass, "camera", {})
with patch(
"homeassistant.components.camera.Stream.endpoint_url",
return_value="http://home.assistant/playlist.m3u8",
), patch(
"homeassistant.components.demo.camera.DemoCamera.stream_source",
return_value="http://example.com",
), patch(
"homeassistant.components.camera.Stream.set_update_callback",
) as mock_update_callback:
# Request playlist through WebSocket. We just want to create the stream
# but don't care about the result.
client = await hass_ws_client(hass)
await client.send_json(
{"id": 10, "type": "camera/stream", "entity_id": "camera.demo_camera"}
)
await client.receive_json()
assert mock_update_callback.called
# Simluate the stream going unavailable
callback = mock_update_callback.call_args.args[0]
with patch(
"homeassistant.components.camera.Stream.available", new_callable=lambda: False
):
callback()
await hass.async_block_till_done()
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None
assert demo_camera.state == STATE_UNAVAILABLE
# Simulate stream becomes available
with patch(
"homeassistant.components.camera.Stream.available", new_callable=lambda: True
):
callback()
await hass.async_block_till_done()
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None
assert demo_camera.state == camera.STATE_STREAMING

View File

@ -173,6 +173,14 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync, h264_video)
# Setup demo HLS track
stream = create_stream(hass, h264_video, {})
available_states = []
def update_callback() -> None:
nonlocal available_states
available_states.append(stream.available)
stream.set_update_callback(update_callback)
# Request stream
stream.add_provider(HLS_PROVIDER)
stream.start()
@ -204,6 +212,9 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync, h264_video)
fail_response = await http_client.get(parsed_url.path)
assert fail_response.status == HTTPStatus.NOT_FOUND
# Streams only marked as failure when keepalive is true
assert available_states == [True]
async def test_stream_timeout_after_stop(
hass, hass_client, stream_worker_sync, h264_video
@ -237,13 +248,21 @@ async def test_stream_keepalive(hass):
# Setup demo HLS track
source = "test_stream_keepalive_source"
stream = create_stream(hass, source, {})
assert stream.available
track = stream.add_provider(HLS_PROVIDER)
track.num_segments = 2
available_states = []
def update_callback() -> None:
nonlocal available_states
available_states.append(stream.available)
stream.set_update_callback(update_callback)
cur_time = 0
def time_side_effect():
print("stream.available=%s", stream.available)
nonlocal cur_time
if cur_time >= 80:
stream.keepalive = False # Thread should exit and be joinable.
@ -263,11 +282,14 @@ async def test_stream_keepalive(hass):
stream._thread.join()
stream._thread = None
assert av_open.call_count == 2
assert not stream.available
await hass.async_block_till_done()
# Stop stream, if it hasn't quit already
stream.stop()
assert not stream.available
# Stream marked initially available, then marked as failed, then marked available
# before the final failure that exits the stream.
assert available_states == [True, False, True]
async def test_hls_playlist_view_no_output(hass, hls_stream):