Add image platform to the template integration (#94928)

* Add image platform to the template integration

* Set a default name

* Fix typo

* Add tests

* Improve test coverage

* Derive content-type from fetched image
This commit is contained in:
Erik Montnemery 2023-06-26 15:54:51 +02:00 committed by GitHub
parent 8fda56d2c9
commit 8e2ba81995
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 809 additions and 1 deletions

View File

@ -129,7 +129,7 @@ class ImageEntity(Entity):
return self._attr_content_type
@property
def entity_picture(self) -> str:
def entity_picture(self) -> str | None:
"""Return a link to the image as entity picture."""
if self._attr_entity_picture is not None:
return self._attr_entity_picture

View File

@ -5,6 +5,7 @@ import voluptuous as vol
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR_DOMAIN
from homeassistant.components.button import DOMAIN as BUTTON_DOMAIN
from homeassistant.components.image import DOMAIN as IMAGE_DOMAIN
from homeassistant.components.number import DOMAIN as NUMBER_DOMAIN
from homeassistant.components.select import DOMAIN as SELECT_DOMAIN
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
@ -16,6 +17,7 @@ from homeassistant.helpers.trigger import async_validate_trigger_config
from . import (
binary_sensor as binary_sensor_platform,
button as button_platform,
image as image_platform,
number as number_platform,
select as select_platform,
sensor as sensor_platform,
@ -49,6 +51,9 @@ CONFIG_SECTION_SCHEMA = vol.Schema(
vol.Optional(BUTTON_DOMAIN): vol.All(
cv.ensure_list, [button_platform.BUTTON_SCHEMA]
),
vol.Optional(IMAGE_DOMAIN): vol.All(
cv.ensure_list, [image_platform.IMAGE_SCHEMA]
),
}
)

View File

@ -16,6 +16,7 @@ PLATFORMS = [
Platform.BUTTON,
Platform.COVER,
Platform.FAN,
Platform.IMAGE,
Platform.LIGHT,
Platform.LOCK,
Platform.NUMBER,

View File

@ -0,0 +1,198 @@
"""Support for image which integrates with other components."""
from __future__ import annotations
import logging
from typing import Any
import httpx
import voluptuous as vol
from homeassistant.components.image import (
DOMAIN as IMAGE_DOMAIN,
ImageEntity,
)
from homeassistant.const import CONF_UNIQUE_ID, CONF_URL, CONF_VERIFY_SSL
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util
from . import TriggerUpdateCoordinator
from .const import CONF_PICTURE
from .template_entity import (
TemplateEntity,
make_template_entity_common_schema,
)
from .trigger_entity import TriggerEntity
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "Template Image"
GET_IMAGE_TIMEOUT = 10
IMAGE_SCHEMA = vol.Schema(
{
vol.Required(CONF_URL): cv.template,
vol.Optional(CONF_VERIFY_SSL, default=True): bool,
}
).extend(make_template_entity_common_schema(DEFAULT_NAME).schema)
async def _async_create_entities(
hass: HomeAssistant, definitions: list[dict[str, Any]], unique_id_prefix: str | None
) -> list[StateImageEntity]:
"""Create the template image."""
entities = []
for definition in definitions:
unique_id = definition.get(CONF_UNIQUE_ID)
if unique_id and unique_id_prefix:
unique_id = f"{unique_id_prefix}-{unique_id}"
entities.append(StateImageEntity(hass, definition, unique_id))
return entities
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the template image."""
if discovery_info is None:
_LOGGER.warning(
"Template image entities can only be configured under template:"
)
return
if "coordinator" in discovery_info:
async_add_entities(
TriggerImageEntity(hass, discovery_info["coordinator"], config)
for config in discovery_info["entities"]
)
return
async_add_entities(
await _async_create_entities(
hass, discovery_info["entities"], discovery_info["unique_id"]
)
)
class TemplateImage(ImageEntity):
"""Base class for templated image."""
_last_image: bytes | None = None
_url: str | None = None
_verify_ssl: bool
def __init__(self, verify_ssl: bool) -> None:
"""Initialize the image."""
super().__init__()
self._verify_ssl = verify_ssl
async def async_image(self) -> bytes | None:
"""Return bytes of image."""
if self._last_image:
return self._last_image
if not (url := self._url):
return None
try:
async_client = get_async_client(self.hass, verify_ssl=self._verify_ssl)
response = await async_client.get(
url, timeout=GET_IMAGE_TIMEOUT, follow_redirects=True
)
response.raise_for_status()
self._attr_content_type = response.headers["content-type"]
self._last_image = response.content
return self._last_image
except httpx.TimeoutException:
_LOGGER.error("%s: Timeout getting image from %s", self.entity_id, url)
return None
except (httpx.RequestError, httpx.HTTPStatusError) as err:
_LOGGER.error(
"%s: Error getting new image from %s: %s",
self.entity_id,
url,
err,
)
return None
class StateImageEntity(TemplateEntity, TemplateImage):
"""Representation of a template image."""
_attr_should_poll = False
def __init__(
self,
hass: HomeAssistant,
config: dict[str, Any],
unique_id: str | None,
) -> None:
"""Initialize the image."""
TemplateEntity.__init__(self, hass, config=config, unique_id=unique_id)
TemplateImage.__init__(self, config[CONF_VERIFY_SSL])
self._url_template = config[CONF_URL]
@property
def entity_picture(self) -> str | None:
"""Return entity picture."""
# mypy doesn't know about fget: https://github.com/python/mypy/issues/6185
if self._entity_picture_template:
return TemplateEntity.entity_picture.fget(self) # type: ignore[attr-defined]
return ImageEntity.entity_picture.fget(self) # type: ignore[attr-defined]
@callback
def _update_url(self, result):
if isinstance(result, TemplateError):
self._url = None
return
self._attr_image_last_updated = dt_util.utcnow()
self._last_image = None
self._url = result
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
self.add_template_attribute("_url", self._url_template, None, self._update_url)
await super().async_added_to_hass()
class TriggerImageEntity(TriggerEntity, TemplateImage):
"""Image entity based on trigger data."""
_last_image: bytes | None = None
domain = IMAGE_DOMAIN
extra_template_keys = (CONF_URL,)
def __init__(
self,
hass: HomeAssistant,
coordinator: TriggerUpdateCoordinator,
config: dict,
) -> None:
"""Initialize the entity."""
TriggerEntity.__init__(self, hass, coordinator, config)
TemplateImage.__init__(self, config[CONF_VERIFY_SSL])
@property
def entity_picture(self) -> str | None:
"""Return entity picture."""
# mypy doesn't know about fget: https://github.com/python/mypy/issues/6185
if CONF_PICTURE in self._config:
return TriggerEntity.entity_picture.fget(self) # type: ignore[attr-defined]
return ImageEntity.entity_picture.fget(self) # type: ignore[attr-defined]
@callback
def _process_data(self) -> None:
"""Process new data."""
super()._process_data()
self._attr_image_last_updated = dt_util.utcnow()
self._last_image = None
self._url = self._rendered.get(CONF_URL)

View File

@ -18,6 +18,7 @@ from homeassistant.helpers.template import Template
from homeassistant.helpers.template_entity import ( # noqa: F401 pylint: disable=unused-import
TEMPLATE_ENTITY_BASE_SCHEMA,
TemplateEntity,
make_template_entity_base_schema,
)
from .const import (
@ -47,6 +48,17 @@ TEMPLATE_ENTITY_COMMON_SCHEMA = vol.Schema(
}
).extend(TEMPLATE_ENTITY_BASE_SCHEMA.schema)
def make_template_entity_common_schema(default_name: str) -> vol.Schema:
"""Return a schema with default name."""
return vol.Schema(
{
vol.Optional(CONF_ATTRIBUTES): vol.Schema({cv.string: cv.template}),
vol.Optional(CONF_AVAILABILITY): cv.template,
}
).extend(make_template_entity_base_schema(default_name).schema)
TEMPLATE_ENTITY_ATTRIBUTES_SCHEMA_LEGACY = vol.Schema(
{
vol.Optional(CONF_ATTRIBUTE_TEMPLATES, default={}): vol.Schema(

View File

@ -65,6 +65,19 @@ TEMPLATE_ENTITY_BASE_SCHEMA = vol.Schema(
}
)
def make_template_entity_base_schema(default_name: str) -> vol.Schema:
"""Return a schema with default name."""
return vol.Schema(
{
vol.Optional(CONF_ICON): cv.template,
vol.Optional(CONF_NAME, default=default_name): cv.template,
vol.Optional(CONF_PICTURE): cv.template,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
)
TEMPLATE_SENSOR_BASE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,

View File

@ -0,0 +1,579 @@
"""The tests for the Template image platform."""
from http import HTTPStatus
from io import BytesIO
from typing import Any
import httpx
from PIL import Image
import pytest
import respx
from homeassistant import setup
from homeassistant.components.input_text import (
ATTR_VALUE as INPUT_TEXT_ATTR_VALUE,
DOMAIN as INPUT_TEXT_DOMAIN,
SERVICE_SET_VALUE as INPUT_TEXT_SERVICE_SET_VALUE,
)
from homeassistant.const import (
ATTR_ENTITY_PICTURE,
CONF_ENTITY_ID,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_registry import async_get
from homeassistant.util import dt as dt_util
from tests.common import assert_setup_component
from tests.typing import ClientSessionGenerator
_DEFAULT = object()
_TEST_IMAGE = "image.template_image"
_URL_INPUT_TEXT = "input_text.url"
@pytest.fixture
def imgbytes_jpg():
"""Image in RAM for testing."""
buf = BytesIO() # fake image in ram for testing.
Image.new("RGB", (1, 1)).save(buf, format="jpeg")
return bytes(buf.getbuffer())
@pytest.fixture
def imgbytes2_jpg():
"""Image in RAM for testing."""
buf = BytesIO() # fake image in ram for testing.
Image.new("RGB", (1, 1), 100).save(buf, format="jpeg")
return bytes(buf.getbuffer())
async def _assert_state(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
expected_state: str,
expected_image: bytes | None,
entity_id: str = _TEST_IMAGE,
expected_content_type: str = "image/jpeg",
expected_entity_picture: Any = _DEFAULT,
expected_status: HTTPStatus = HTTPStatus.OK,
):
"""Verify image's state."""
state = hass.states.get(entity_id)
attributes = state.attributes
assert state.state == expected_state
if expected_entity_picture is _DEFAULT:
expected_entity_picture = (
f"/api/image_proxy/{entity_id}?token={attributes['access_token']}"
)
assert attributes.get(ATTR_ENTITY_PICTURE) == expected_entity_picture
client = await hass_client()
resp = await client.get(f"/api/image_proxy/{entity_id}")
assert resp.content_type == expected_content_type
assert resp.status == expected_status
body = await resp.read()
assert body == expected_image
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_platform_config(
hass: HomeAssistant, hass_client: ClientSessionGenerator, imgbytes_jpg
) -> None:
"""Test configuring under the platform key does not work."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
with assert_setup_component(1, "image"):
assert await setup.async_setup_component(
hass,
"image",
{
"image": {
"platform": "template",
"url": "{{ 'http://example.com' }}",
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
assert len(hass.states.async_all()) == 0
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_missing_optional_config(
hass: HomeAssistant, hass_client: ClientSessionGenerator, imgbytes_jpg
) -> None:
"""Test: missing optional template is ok."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"url": "{{ 'http://example.com' }}",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
assert respx.get("http://example.com").call_count == 1
# Check the image is not refetched
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
assert respx.get("http://example.com").call_count == 1
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_multiple_configs(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
imgbytes_jpg,
imgbytes2_jpg,
) -> None:
"""Test: multiple image entities get created."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
respx.get("http://example2.com").respond(
stream=imgbytes2_jpg, content_type="image/png"
)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": [
{
"url": "{{ 'http://example.com' }}",
},
{
"url": "{{ 'http://example2.com' }}",
},
]
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
await _assert_state(
hass,
hass_client,
expected_state,
imgbytes2_jpg,
f"{_TEST_IMAGE}_2",
expected_content_type="image/png",
)
async def test_missing_required_keys(hass: HomeAssistant) -> None:
"""Test: missing required fields will fail."""
with assert_setup_component(0, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"name": "a name",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
assert hass.states.async_all("image") == []
async def test_unique_id(hass: HomeAssistant) -> None:
"""Test unique_id configuration."""
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"unique_id": "b",
"image": {
"url": "http://example.com",
"unique_id": "a",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
ent_reg = async_get(hass)
entry = ent_reg.async_get(_TEST_IMAGE)
assert entry
assert entry.unique_id == "b-a"
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_custom_entity_picture(
hass: HomeAssistant, hass_client: ClientSessionGenerator, imgbytes_jpg
) -> None:
"""Test custom entity picture."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"url": "http://example.com",
"picture": "http://example2.com",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(
hass,
hass_client,
expected_state,
imgbytes_jpg,
expected_entity_picture="http://example2.com",
)
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_http_error(
hass: HomeAssistant, hass_client: ClientSessionGenerator
) -> None:
"""Test handling http error."""
respx.get("http://example.com").respond(HTTPStatus.NOT_FOUND)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"url": "http://example.com",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(
hass,
hass_client,
expected_state,
b"500: Internal Server Error",
expected_status=HTTPStatus.INTERNAL_SERVER_ERROR,
expected_content_type="text/plain",
)
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_http_timeout(
hass: HomeAssistant, hass_client: ClientSessionGenerator
) -> None:
"""Test handling http timeout."""
respx.get("http://example.com").side_effect = httpx.TimeoutException
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"url": "http://example.com",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(
hass,
hass_client,
expected_state,
b"500: Internal Server Error",
expected_status=HTTPStatus.INTERNAL_SERVER_ERROR,
expected_content_type="text/plain",
)
@respx.mock
async def test_template_error(
hass: HomeAssistant, hass_client: ClientSessionGenerator
) -> None:
"""Test handling template error."""
respx.get("http://example.com").side_effect = httpx.TimeoutException
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"url": "{{ no_such_variable.url }}",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
await _assert_state(
hass,
hass_client,
STATE_UNKNOWN,
b"500: Internal Server Error",
expected_status=HTTPStatus.INTERNAL_SERVER_ERROR,
expected_content_type="text/plain",
)
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_templates_with_entities(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
imgbytes_jpg,
imgbytes2_jpg,
) -> None:
"""Test templates with values from other entities."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
respx.get("http://example2.com").respond(
stream=imgbytes2_jpg, content_type="image/png"
)
with assert_setup_component(1, "input_text"):
assert await setup.async_setup_component(
hass,
"input_text",
{
"input_text": {
"url": {
"initial": "http://example.com",
"name": "url",
},
}
},
)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"image": {
"url": f"{{{{ states('{_URL_INPUT_TEXT}') }}}}",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
assert respx.get("http://example.com").call_count == 1
# Check the image is not refetched
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
assert respx.get("http://example.com").call_count == 1
await hass.services.async_call(
INPUT_TEXT_DOMAIN,
INPUT_TEXT_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: _URL_INPUT_TEXT, INPUT_TEXT_ATTR_VALUE: "http://example2.com"},
blocking=True,
)
await hass.async_block_till_done()
await _assert_state(
hass,
hass_client,
expected_state,
imgbytes2_jpg,
expected_content_type="image/png",
)
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_trigger_image(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
imgbytes_jpg,
imgbytes2_jpg,
) -> None:
"""Test trigger based template image."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
respx.get("http://example2.com").respond(
stream=imgbytes2_jpg, content_type="image/png"
)
assert await setup.async_setup_component(
hass,
"template",
{
"template": [
{
"trigger": {"platform": "event", "event_type": "test_event"},
"image": [
{
"url": "{{ trigger.event.data.url }}",
},
],
},
],
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
# No image is loaded, expect error
await _assert_state(
hass,
hass_client,
"unknown",
b"500: Internal Server Error",
expected_status=HTTPStatus.INTERNAL_SERVER_ERROR,
expected_content_type="text/plain",
)
hass.bus.async_fire("test_event", {"url": "http://example.com"})
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
assert respx.get("http://example.com").call_count == 1
# Check the image is not refetched
await _assert_state(hass, hass_client, expected_state, imgbytes_jpg)
assert respx.get("http://example.com").call_count == 1
hass.bus.async_fire("test_event", {"url": "http://example2.com"})
await hass.async_block_till_done()
await _assert_state(
hass,
hass_client,
expected_state,
imgbytes2_jpg,
expected_content_type="image/png",
)
@respx.mock
@pytest.mark.freeze_time("2023-04-01 00:00:00+00:00")
async def test_trigger_image_custom_entity_picture(
hass: HomeAssistant, hass_client: ClientSessionGenerator, imgbytes_jpg
) -> None:
"""Test trigger based template image with custom entity picture."""
respx.get("http://example.com").respond(
stream=imgbytes_jpg, content_type="image/jpeg"
)
assert await setup.async_setup_component(
hass,
"template",
{
"template": [
{
"trigger": {"platform": "event", "event_type": "test_event"},
"image": [
{
"url": "{{ trigger.event.data.url }}",
"picture": "http://example2.com",
},
],
},
],
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
# No image is loaded, expect error
await _assert_state(
hass,
hass_client,
"unknown",
b"500: Internal Server Error",
expected_status=HTTPStatus.INTERNAL_SERVER_ERROR,
expected_entity_picture="http://example2.com",
expected_content_type="text/plain",
)
hass.bus.async_fire("test_event", {"url": "http://example.com"})
await hass.async_block_till_done()
expected_state = dt_util.utcnow().isoformat()
await _assert_state(
hass,
hass_client,
expected_state,
imgbytes_jpg,
expected_entity_picture="http://example2.com",
)