Use lock in Camera.create_stream (#62757)

Rename create_stream to async_create_stream in Camera
Rename get_image to async_get_image in Stream
Rename get_image to async_get_image in KeyFrameConverter
This commit is contained in:
uvjustin 2021-12-25 04:14:43 +08:00 committed by GitHub
parent 78cc5f8d43
commit 9dbba6b7f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 28 deletions

View File

@ -312,7 +312,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
camera_prefs = prefs.get(camera.entity_id)
if not camera_prefs.preload_stream:
continue
stream = await camera.create_stream()
stream = await camera.async_create_stream()
if not stream:
continue
stream.keepalive = True
@ -390,6 +390,7 @@ class Camera(Entity):
self.access_tokens: collections.deque = collections.deque([], 2)
self._warned_old_signature = False
self.async_update_token()
self._create_stream_lock: asyncio.Lock | None = None
@property
def entity_picture(self) -> str:
@ -454,22 +455,25 @@ class Camera(Entity):
return self.stream.available
return super().available
async def create_stream(self) -> Stream | None:
async def async_create_stream(self) -> Stream | None:
"""Create a Stream for stream_source."""
# There is at most one stream (a decode worker) per camera
if not self.stream:
async with async_timeout.timeout(CAMERA_STREAM_SOURCE_TIMEOUT):
source = await self.stream_source()
if not source:
return None
self.stream = create_stream(
self.hass,
source,
options=self.stream_options,
stream_label=self.entity_id,
)
self.stream.set_update_callback(self.async_write_ha_state)
return self.stream
if not self._create_stream_lock:
self._create_stream_lock = asyncio.Lock()
async with self._create_stream_lock:
if not self.stream:
async with async_timeout.timeout(CAMERA_STREAM_SOURCE_TIMEOUT):
source = await self.stream_source()
if not source:
return None
self.stream = create_stream(
self.hass,
source,
options=self.stream_options,
stream_label=self.entity_id,
)
self.stream.set_update_callback(self.async_write_ha_state)
return self.stream
async def stream_source(self) -> str | None:
"""Return the source of the stream.
@ -918,7 +922,7 @@ async def async_handle_play_stream_service(
async def _async_stream_endpoint_url(
hass: HomeAssistant, camera: Camera, fmt: str
) -> str:
stream = await camera.create_stream()
stream = await camera.async_create_stream()
if not stream:
raise HomeAssistantError(
f"{camera.entity_id} does not support play stream service"
@ -937,7 +941,7 @@ async def async_handle_record_service(
camera: Camera, service_call: ServiceCall
) -> None:
"""Handle stream recording service calls."""
stream = await camera.create_stream()
stream = await camera.async_create_stream()
if not stream:
raise HomeAssistantError(f"{camera.entity_id} does not support record service")

View File

@ -424,11 +424,19 @@ class Stream:
await hls.recv()
recorder.prepend(list(hls.get_segments())[-num_segments:])
async def get_image(
async def async_get_image(
self,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Wrap get_image from KeyFrameConverter."""
"""
Fetch an image from the Stream and return it as a jpeg in bytes.
return await self._keyframe_converter.get_image(width=width, height=height)
Calls async_get_image from KeyFrameConverter. async_get_image should only be
called directly from the main loop and not from an executor thread as it uses
hass.add_executor_job underneath the hood.
"""
return await self._keyframe_converter.async_get_image(
width=width, height=height
)

View File

@ -362,16 +362,16 @@ class StreamView(HomeAssistantView):
class KeyFrameConverter:
"""
Generate and hold the keyframe as a jpeg.
Enables generating and getting an image from the last keyframe seen in the stream.
An overview of the thread and state interaction:
the worker thread sets a packet
at any time, main loop can run a get_image call
get_image is called from the main asyncio loop
get_image schedules _generate_image in an executor thread
_generate_image will try to create an image from the packet
Running _generate_image will clear the packet, so there will only
be one attempt per packet
If successful, _image will be updated and returned by get_image
If unsuccessful, get_image will return the previous image
_generate_image will clear the packet, so there will only be one attempt per packet
If successful, self._image will be updated and returned by get_image
If unsuccessful, get_image will return the previous image
"""
def __init__(self, hass: HomeAssistant) -> None:
@ -430,7 +430,7 @@ class KeyFrameConverter:
bgr_array = frame.to_ndarray(format="bgr24")
self._image = bytes(self._turbojpeg.encode(bgr_array))
async def get_image(
async def async_get_image(
self,
width: int | None = None,
height: int | None = None,

View File

@ -892,6 +892,6 @@ async def test_get_image(hass, record_worker_sync):
await record_worker_sync.join()
assert await stream.get_image() == EMPTY_8_6_JPEG
assert await stream.async_get_image() == EMPTY_8_6_JPEG
stream.stop()