diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index fd05d3c2095..5d170eece4c 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -447,6 +447,13 @@ class Camera(Entity): return None return STREAM_TYPE_HLS + @property + def available(self) -> bool: + """Return True if entity is available.""" + if self.stream and not self.stream.available: + return self.stream.available + return super().available + async def create_stream(self) -> Stream | None: """Create a Stream for stream_source.""" # There is at most one stream (a decode worker) per camera @@ -456,6 +463,7 @@ class Camera(Entity): if not source: return None self.stream = create_stream(self.hass, source, options=self.stream_options) + self.stream.set_update_callback(self.async_write_ha_state) return self.stream async def stream_source(self) -> str | None: diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 070dd062e42..2d2e4058460 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -16,7 +16,7 @@ to always keep workers active. """ from __future__ import annotations -from collections.abc import Mapping +from collections.abc import Callable, Mapping import logging import re import secrets @@ -204,7 +204,8 @@ class Stream: self._thread_quit = threading.Event() self._outputs: dict[str, StreamOutput] = {} self._fast_restart_once = False - self._available = True + self._available: bool = True + self._update_callback: Callable[[], None] | None = None def endpoint_url(self, fmt: str) -> str: """Start the stream and returns a url for the output format.""" @@ -260,6 +261,17 @@ class Stream: """Return False if the stream is started and known to be unavailable.""" return self._available + def set_update_callback(self, update_callback: Callable[[], None]) -> None: + """Set callback to run when state changes.""" + self._update_callback = update_callback + + @callback + def _async_update_state(self, available: bool) -> None: + """Set state and Run callback to notify state has been updated.""" + self._available = available + if self._update_callback: + self._update_callback() + def start(self) -> None: """Start a stream.""" if self._thread is None or not self._thread.is_alive(): @@ -292,8 +304,7 @@ class Stream: wait_timeout = 0 while not self._thread_quit.wait(timeout=wait_timeout): start_time = time.time() - - self._available = True + self.hass.add_job(self._async_update_state, True) try: stream_worker( self.source, @@ -303,7 +314,6 @@ class Stream: ) except StreamWorkerError as err: _LOGGER.error("Error from stream worker: %s", str(err)) - self._available = False stream_state.discontinuity() if not self.keepalive or self._thread_quit.is_set(): @@ -314,6 +324,7 @@ class Stream: continue break + self.hass.add_job(self._async_update_state, False) # To avoid excessive restarts, wait before restarting # As the required recovery time may be different for different setups, start # with trying a short wait_timeout and increase it on each reconnection attempt. diff --git a/tests/components/camera/test_init.py b/tests/components/camera/test_init.py index 3f8f62449ba..13ec69bcd4e 100644 --- a/tests/components/camera/test_init.py +++ b/tests/components/camera/test_init.py @@ -16,7 +16,11 @@ from homeassistant.components.camera.const import ( from homeassistant.components.camera.prefs import CameraEntityPreferences from homeassistant.components.websocket_api.const import TYPE_RESULT from homeassistant.config import async_process_ha_core_config -from homeassistant.const import ATTR_ENTITY_ID, EVENT_HOMEASSISTANT_START +from homeassistant.const import ( + ATTR_ENTITY_ID, + EVENT_HOMEASSISTANT_START, + STATE_UNAVAILABLE, +) from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import async_setup_component @@ -633,3 +637,56 @@ async def test_websocket_web_rtc_offer_invalid_stream_type( assert response["type"] == TYPE_RESULT assert not response["success"] assert response["error"]["code"] == "web_rtc_offer_failed" + + +async def test_state_streaming(hass, hass_ws_client, mock_camera): + """Camera state.""" + demo_camera = hass.states.get("camera.demo_camera") + assert demo_camera is not None + assert demo_camera.state == camera.STATE_STREAMING + + +async def test_stream_unavailable(hass, hass_ws_client, mock_camera, mock_stream): + """Camera state.""" + await async_setup_component(hass, "camera", {}) + + with patch( + "homeassistant.components.camera.Stream.endpoint_url", + return_value="http://home.assistant/playlist.m3u8", + ), patch( + "homeassistant.components.demo.camera.DemoCamera.stream_source", + return_value="http://example.com", + ), patch( + "homeassistant.components.camera.Stream.set_update_callback", + ) as mock_update_callback: + # Request playlist through WebSocket. We just want to create the stream + # but don't care about the result. + client = await hass_ws_client(hass) + await client.send_json( + {"id": 10, "type": "camera/stream", "entity_id": "camera.demo_camera"} + ) + await client.receive_json() + assert mock_update_callback.called + + # Simluate the stream going unavailable + callback = mock_update_callback.call_args.args[0] + with patch( + "homeassistant.components.camera.Stream.available", new_callable=lambda: False + ): + callback() + await hass.async_block_till_done() + + demo_camera = hass.states.get("camera.demo_camera") + assert demo_camera is not None + assert demo_camera.state == STATE_UNAVAILABLE + + # Simulate stream becomes available + with patch( + "homeassistant.components.camera.Stream.available", new_callable=lambda: True + ): + callback() + await hass.async_block_till_done() + + demo_camera = hass.states.get("camera.demo_camera") + assert demo_camera is not None + assert demo_camera.state == camera.STATE_STREAMING diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index 0f50f830a85..67c7415f509 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -173,6 +173,14 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync, h264_video) # Setup demo HLS track stream = create_stream(hass, h264_video, {}) + available_states = [] + + def update_callback() -> None: + nonlocal available_states + available_states.append(stream.available) + + stream.set_update_callback(update_callback) + # Request stream stream.add_provider(HLS_PROVIDER) stream.start() @@ -204,6 +212,9 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync, h264_video) fail_response = await http_client.get(parsed_url.path) assert fail_response.status == HTTPStatus.NOT_FOUND + # Streams only marked as failure when keepalive is true + assert available_states == [True] + async def test_stream_timeout_after_stop( hass, hass_client, stream_worker_sync, h264_video @@ -237,13 +248,21 @@ async def test_stream_keepalive(hass): # Setup demo HLS track source = "test_stream_keepalive_source" stream = create_stream(hass, source, {}) - assert stream.available track = stream.add_provider(HLS_PROVIDER) track.num_segments = 2 + available_states = [] + + def update_callback() -> None: + nonlocal available_states + available_states.append(stream.available) + + stream.set_update_callback(update_callback) + cur_time = 0 def time_side_effect(): + print("stream.available=%s", stream.available) nonlocal cur_time if cur_time >= 80: stream.keepalive = False # Thread should exit and be joinable. @@ -263,11 +282,14 @@ async def test_stream_keepalive(hass): stream._thread.join() stream._thread = None assert av_open.call_count == 2 - assert not stream.available + await hass.async_block_till_done() # Stop stream, if it hasn't quit already stream.stop() - assert not stream.available + + # Stream marked initially available, then marked as failed, then marked available + # before the final failure that exits the stream. + assert available_states == [True, False, True] async def test_hls_playlist_view_no_output(hass, hls_stream):