diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index 45d0cf9371a..5c81ece6141 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -312,7 +312,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: camera_prefs = prefs.get(camera.entity_id) if not camera_prefs.preload_stream: continue - stream = await camera.create_stream() + stream = await camera.async_create_stream() if not stream: continue stream.keepalive = True @@ -390,6 +390,7 @@ class Camera(Entity): self.access_tokens: collections.deque = collections.deque([], 2) self._warned_old_signature = False self.async_update_token() + self._create_stream_lock: asyncio.Lock | None = None @property def entity_picture(self) -> str: @@ -454,22 +455,25 @@ class Camera(Entity): return self.stream.available return super().available - async def create_stream(self) -> Stream | None: + async def async_create_stream(self) -> Stream | None: """Create a Stream for stream_source.""" # There is at most one stream (a decode worker) per camera - if not self.stream: - async with async_timeout.timeout(CAMERA_STREAM_SOURCE_TIMEOUT): - source = await self.stream_source() - if not source: - return None - self.stream = create_stream( - self.hass, - source, - options=self.stream_options, - stream_label=self.entity_id, - ) - self.stream.set_update_callback(self.async_write_ha_state) - return self.stream + if not self._create_stream_lock: + self._create_stream_lock = asyncio.Lock() + async with self._create_stream_lock: + if not self.stream: + async with async_timeout.timeout(CAMERA_STREAM_SOURCE_TIMEOUT): + source = await self.stream_source() + if not source: + return None + self.stream = create_stream( + self.hass, + source, + options=self.stream_options, + stream_label=self.entity_id, + ) + self.stream.set_update_callback(self.async_write_ha_state) + return self.stream async def stream_source(self) -> str | None: """Return the source of the stream. @@ -918,7 +922,7 @@ async def async_handle_play_stream_service( async def _async_stream_endpoint_url( hass: HomeAssistant, camera: Camera, fmt: str ) -> str: - stream = await camera.create_stream() + stream = await camera.async_create_stream() if not stream: raise HomeAssistantError( f"{camera.entity_id} does not support play stream service" @@ -937,7 +941,7 @@ async def async_handle_record_service( camera: Camera, service_call: ServiceCall ) -> None: """Handle stream recording service calls.""" - stream = await camera.create_stream() + stream = await camera.async_create_stream() if not stream: raise HomeAssistantError(f"{camera.entity_id} does not support record service") diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index fec9731136f..1a4ce3d92e8 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -424,11 +424,19 @@ class Stream: await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments:]) - async def get_image( + async def async_get_image( self, width: int | None = None, height: int | None = None, ) -> bytes | None: - """Wrap get_image from KeyFrameConverter.""" + """ + Fetch an image from the Stream and return it as a jpeg in bytes. - return await self._keyframe_converter.get_image(width=width, height=height) + Calls async_get_image from KeyFrameConverter. async_get_image should only be + called directly from the main loop and not from an executor thread as it uses + hass.add_executor_job underneath the hood. + """ + + return await self._keyframe_converter.async_get_image( + width=width, height=height + ) diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 08397fb6876..91414dd96d9 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -362,16 +362,16 @@ class StreamView(HomeAssistantView): class KeyFrameConverter: """ - Generate and hold the keyframe as a jpeg. + Enables generating and getting an image from the last keyframe seen in the stream. An overview of the thread and state interaction: the worker thread sets a packet - at any time, main loop can run a get_image call + get_image is called from the main asyncio loop + get_image schedules _generate_image in an executor thread _generate_image will try to create an image from the packet - Running _generate_image will clear the packet, so there will only - be one attempt per packet - If successful, _image will be updated and returned by get_image - If unsuccessful, get_image will return the previous image + _generate_image will clear the packet, so there will only be one attempt per packet + If successful, self._image will be updated and returned by get_image + If unsuccessful, get_image will return the previous image """ def __init__(self, hass: HomeAssistant) -> None: @@ -430,7 +430,7 @@ class KeyFrameConverter: bgr_array = frame.to_ndarray(format="bgr24") self._image = bytes(self._turbojpeg.encode(bgr_array)) - async def get_image( + async def async_get_image( self, width: int | None = None, height: int | None = None, diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index eb50e76a80a..6e35cc65b6f 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -892,6 +892,6 @@ async def test_get_image(hass, record_worker_sync): await record_worker_sync.join() - assert await stream.get_image() == EMPTY_8_6_JPEG + assert await stream.async_get_image() == EMPTY_8_6_JPEG stream.stop()