mirror of
https://github.com/home-assistant/core.git
synced 2025-11-15 14:00:24 +00:00
Add async_get_image helper to Image integration (#152465)
This commit is contained in:
@@ -12,7 +12,7 @@ from typing import Any
|
|||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant.components import camera, conversation, media_source
|
from homeassistant.components import camera, conversation, image, media_source
|
||||||
from homeassistant.components.http.auth import async_sign_path
|
from homeassistant.components.http.auth import async_sign_path
|
||||||
from homeassistant.core import HomeAssistant, ServiceResponse, callback
|
from homeassistant.core import HomeAssistant, ServiceResponse, callback
|
||||||
from homeassistant.exceptions import HomeAssistantError
|
from homeassistant.exceptions import HomeAssistantError
|
||||||
@@ -31,14 +31,14 @@ from .const import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _save_camera_snapshot(image: camera.Image) -> Path:
|
def _save_camera_snapshot(image_data: camera.Image | image.Image) -> Path:
|
||||||
"""Save camera snapshot to temp file."""
|
"""Save camera snapshot to temp file."""
|
||||||
with tempfile.NamedTemporaryFile(
|
with tempfile.NamedTemporaryFile(
|
||||||
mode="wb",
|
mode="wb",
|
||||||
suffix=mimetypes.guess_extension(image.content_type, False),
|
suffix=mimetypes.guess_extension(image_data.content_type, False),
|
||||||
delete=False,
|
delete=False,
|
||||||
) as temp_file:
|
) as temp_file:
|
||||||
temp_file.write(image.content)
|
temp_file.write(image_data.content)
|
||||||
return Path(temp_file.name)
|
return Path(temp_file.name)
|
||||||
|
|
||||||
|
|
||||||
@@ -54,26 +54,31 @@ async def _resolve_attachments(
|
|||||||
for attachment in attachments or []:
|
for attachment in attachments or []:
|
||||||
media_content_id = attachment["media_content_id"]
|
media_content_id = attachment["media_content_id"]
|
||||||
|
|
||||||
# Special case for camera media sources
|
# Special case for certain media sources
|
||||||
if media_content_id.startswith("media-source://camera/"):
|
for integration in camera, image:
|
||||||
# Extract entity_id from the media content ID
|
media_source_prefix = f"media-source://{integration.DOMAIN}/"
|
||||||
entity_id = media_content_id.removeprefix("media-source://camera/")
|
if not media_content_id.startswith(media_source_prefix):
|
||||||
|
continue
|
||||||
|
|
||||||
# Get snapshot from camera
|
# Extract entity_id from the media content ID
|
||||||
image = await camera.async_get_image(hass, entity_id)
|
entity_id = media_content_id.removeprefix(media_source_prefix)
|
||||||
|
|
||||||
|
# Get snapshot from entity
|
||||||
|
image_data = await integration.async_get_image(hass, entity_id)
|
||||||
|
|
||||||
temp_filename = await hass.async_add_executor_job(
|
temp_filename = await hass.async_add_executor_job(
|
||||||
_save_camera_snapshot, image
|
_save_camera_snapshot, image_data
|
||||||
)
|
)
|
||||||
created_files.append(temp_filename)
|
created_files.append(temp_filename)
|
||||||
|
|
||||||
resolved_attachments.append(
|
resolved_attachments.append(
|
||||||
conversation.Attachment(
|
conversation.Attachment(
|
||||||
media_content_id=media_content_id,
|
media_content_id=media_content_id,
|
||||||
mime_type=image.content_type,
|
mime_type=image_data.content_type,
|
||||||
path=temp_filename,
|
path=temp_filename,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
# Handle regular media sources
|
# Handle regular media sources
|
||||||
media = await media_source.async_resolve_media(hass, media_content_id, None)
|
media = await media_source.async_resolve_media(hass, media_content_id, None)
|
||||||
|
|||||||
@@ -105,6 +105,20 @@ async def _async_get_image(image_entity: ImageEntity, timeout: int) -> Image:
|
|||||||
raise HomeAssistantError("Unable to get image")
|
raise HomeAssistantError("Unable to get image")
|
||||||
|
|
||||||
|
|
||||||
|
async def async_get_image(
|
||||||
|
hass: HomeAssistant,
|
||||||
|
entity_id: str,
|
||||||
|
timeout: int = 10,
|
||||||
|
) -> Image:
|
||||||
|
"""Fetch an image from an image entity."""
|
||||||
|
component = hass.data[DATA_COMPONENT]
|
||||||
|
|
||||||
|
if (image := component.get_entity(entity_id)) is None:
|
||||||
|
raise HomeAssistantError(f"Image entity {entity_id} not found")
|
||||||
|
|
||||||
|
return await _async_get_image(image, timeout)
|
||||||
|
|
||||||
|
|
||||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||||
"""Set up the image component."""
|
"""Set up the image component."""
|
||||||
component = hass.data[DATA_COMPONENT] = EntityComponent[ImageEntity](
|
component = hass.data[DATA_COMPONENT] = EntityComponent[ImageEntity](
|
||||||
|
|||||||
@@ -187,7 +187,11 @@ async def test_generate_data_mixed_attachments(
|
|||||||
patch(
|
patch(
|
||||||
"homeassistant.components.camera.async_get_image",
|
"homeassistant.components.camera.async_get_image",
|
||||||
return_value=Image(content_type="image/jpeg", content=b"fake_camera_jpeg"),
|
return_value=Image(content_type="image/jpeg", content=b"fake_camera_jpeg"),
|
||||||
) as mock_get_image,
|
) as mock_get_camera_image,
|
||||||
|
patch(
|
||||||
|
"homeassistant.components.image.async_get_image",
|
||||||
|
return_value=Image(content_type="image/jpeg", content=b"fake_image_jpeg"),
|
||||||
|
) as mock_get_image_image,
|
||||||
patch(
|
patch(
|
||||||
"homeassistant.components.media_source.async_resolve_media",
|
"homeassistant.components.media_source.async_resolve_media",
|
||||||
return_value=media_source.PlayMedia(
|
return_value=media_source.PlayMedia(
|
||||||
@@ -207,6 +211,10 @@ async def test_generate_data_mixed_attachments(
|
|||||||
"media_content_id": "media-source://camera/camera.front_door",
|
"media_content_id": "media-source://camera/camera.front_door",
|
||||||
"media_content_type": "image/jpeg",
|
"media_content_type": "image/jpeg",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"media_content_id": "media-source://image/image.floorplan",
|
||||||
|
"media_content_type": "image/jpeg",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"media_content_id": "media-source://media_player/video.mp4",
|
"media_content_id": "media-source://media_player/video.mp4",
|
||||||
"media_content_type": "video/mp4",
|
"media_content_type": "video/mp4",
|
||||||
@@ -215,7 +223,8 @@ async def test_generate_data_mixed_attachments(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Verify both methods were called
|
# Verify both methods were called
|
||||||
mock_get_image.assert_called_once_with(hass, "camera.front_door")
|
mock_get_camera_image.assert_called_once_with(hass, "camera.front_door")
|
||||||
|
mock_get_image_image.assert_called_once_with(hass, "image.floorplan")
|
||||||
mock_resolve_media.assert_called_once_with(
|
mock_resolve_media.assert_called_once_with(
|
||||||
hass, "media-source://media_player/video.mp4", None
|
hass, "media-source://media_player/video.mp4", None
|
||||||
)
|
)
|
||||||
@@ -224,7 +233,7 @@ async def test_generate_data_mixed_attachments(
|
|||||||
assert len(mock_ai_task_entity.mock_generate_data_tasks) == 1
|
assert len(mock_ai_task_entity.mock_generate_data_tasks) == 1
|
||||||
task = mock_ai_task_entity.mock_generate_data_tasks[0]
|
task = mock_ai_task_entity.mock_generate_data_tasks[0]
|
||||||
assert task.attachments is not None
|
assert task.attachments is not None
|
||||||
assert len(task.attachments) == 2
|
assert len(task.attachments) == 3
|
||||||
|
|
||||||
# Check camera attachment
|
# Check camera attachment
|
||||||
camera_attachment = task.attachments[0]
|
camera_attachment = task.attachments[0]
|
||||||
@@ -240,6 +249,18 @@ async def test_generate_data_mixed_attachments(
|
|||||||
content = await hass.async_add_executor_job(camera_attachment.path.read_bytes)
|
content = await hass.async_add_executor_job(camera_attachment.path.read_bytes)
|
||||||
assert content == b"fake_camera_jpeg"
|
assert content == b"fake_camera_jpeg"
|
||||||
|
|
||||||
|
# Check image attachment
|
||||||
|
image_attachment = task.attachments[1]
|
||||||
|
assert image_attachment.media_content_id == "media-source://image/image.floorplan"
|
||||||
|
assert image_attachment.mime_type == "image/jpeg"
|
||||||
|
assert isinstance(image_attachment.path, Path)
|
||||||
|
assert image_attachment.path.suffix == ".jpg"
|
||||||
|
|
||||||
|
# Verify image snapshot content
|
||||||
|
assert image_attachment.path.exists()
|
||||||
|
content = await hass.async_add_executor_job(image_attachment.path.read_bytes)
|
||||||
|
assert content == b"fake_image_jpeg"
|
||||||
|
|
||||||
# Trigger clean up
|
# Trigger clean up
|
||||||
async_fire_time_changed(
|
async_fire_time_changed(
|
||||||
hass,
|
hass,
|
||||||
@@ -249,9 +270,10 @@ async def test_generate_data_mixed_attachments(
|
|||||||
|
|
||||||
# Verify the temporary file cleaned up
|
# Verify the temporary file cleaned up
|
||||||
assert not camera_attachment.path.exists()
|
assert not camera_attachment.path.exists()
|
||||||
|
assert not image_attachment.path.exists()
|
||||||
|
|
||||||
# Check regular media attachment
|
# Check regular media attachment
|
||||||
media_attachment = task.attachments[1]
|
media_attachment = task.attachments[2]
|
||||||
assert media_attachment.media_content_id == "media-source://media_player/video.mp4"
|
assert media_attachment.media_content_id == "media-source://media_player/video.mp4"
|
||||||
assert media_attachment.mime_type == "video/mp4"
|
assert media_attachment.mime_type == "video/mp4"
|
||||||
assert media_attachment.path == Path("/media/test.mp4")
|
assert media_attachment.path == Path("/media/test.mp4")
|
||||||
|
|||||||
@@ -407,6 +407,15 @@ async def test_image_stream(
|
|||||||
await close_future
|
await close_future
|
||||||
|
|
||||||
|
|
||||||
|
async def test_get_image_action(hass: HomeAssistant, mock_image_platform: None) -> None:
|
||||||
|
"""Test get_image action."""
|
||||||
|
image_data = await image.async_get_image(hass, "image.test")
|
||||||
|
assert image_data == image.Image(content_type="image/jpeg", content=b"Test")
|
||||||
|
|
||||||
|
with pytest.raises(HomeAssistantError, match="not found"):
|
||||||
|
await image.async_get_image(hass, "image.unknown")
|
||||||
|
|
||||||
|
|
||||||
async def test_snapshot_service(hass: HomeAssistant) -> None:
|
async def test_snapshot_service(hass: HomeAssistant) -> None:
|
||||||
"""Test snapshot service."""
|
"""Test snapshot service."""
|
||||||
mopen = mock_open()
|
mopen = mock_open()
|
||||||
|
|||||||
Reference in New Issue
Block a user