Create preview for history_stats

This commit is contained in:
G Johansson 2025-01-07 22:24:05 +00:00
parent de9c05ad53
commit bda95f4d7a
2 changed files with 119 additions and 1 deletions

View File

@ -3,11 +3,15 @@
from __future__ import annotations
from collections.abc import Mapping
from datetime import timedelta
from typing import Any, cast
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.const import CONF_ENTITY_ID, CONF_NAME, CONF_STATE, CONF_TYPE
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.schema_config_entry_flow import (
SchemaCommonFlowHandler,
SchemaConfigFlowHandler,
@ -25,6 +29,7 @@ from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
)
from homeassistant.helpers.template import Template
from .const import (
CONF_DURATION,
@ -36,6 +41,9 @@ from .const import (
DEFAULT_NAME,
DOMAIN,
)
from .coordinator import HistoryStatsUpdateCoordinator
from .data import HistoryStats
from .sensor import HistoryStatsSensor
async def validate_options(
@ -82,12 +90,14 @@ CONFIG_FLOW = {
"options": SchemaFlowFormStep(
schema=DATA_SCHEMA_OPTIONS,
validate_user_input=validate_options,
preview="history_stats",
),
}
OPTIONS_FLOW = {
"init": SchemaFlowFormStep(
DATA_SCHEMA_OPTIONS,
validate_user_input=validate_options,
preview="history_stats",
),
}
@ -101,3 +111,88 @@ class StatisticsConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
def async_config_entry_title(self, options: Mapping[str, Any]) -> str:
"""Return config entry title."""
return cast(str, options[CONF_NAME])
@staticmethod
async def async_setup_preview(hass: HomeAssistant) -> None:
"""Set up preview WS API."""
websocket_api.async_register_command(hass, ws_start_preview)
@websocket_api.websocket_command(
{
vol.Required("type"): "history_stats/start_preview",
vol.Required("flow_id"): str,
vol.Required("flow_type"): vol.Any("config_flow", "options_flow"),
vol.Required("user_input"): dict,
}
)
@websocket_api.async_response
async def ws_start_preview(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Generate a preview."""
if msg["flow_type"] == "config_flow":
flow_status = hass.config_entries.flow.async_get(msg["flow_id"])
flow_sets = hass.config_entries.flow._handler_progress_index.get( # noqa: SLF001
flow_status["handler"]
)
options = {}
assert flow_sets
for active_flow in flow_sets:
options = active_flow._common_handler.options # type: ignore [attr-defined] # noqa: SLF001
config_entry = hass.config_entries.async_get_entry(flow_status["handler"])
entity_id = options[CONF_ENTITY_ID]
name = options[CONF_NAME]
sensor_type = options[CONF_TYPE]
else:
flow_status = hass.config_entries.options.async_get(msg["flow_id"])
config_entry = hass.config_entries.async_get_entry(flow_status["handler"])
if not config_entry:
raise HomeAssistantError("Config entry not found")
entity_id = config_entry.options[CONF_ENTITY_ID]
name = config_entry.options[CONF_NAME]
sensor_type = config_entry.options[CONF_TYPE]
@callback
def async_preview_updated(state: str, attributes: Mapping[str, Any]) -> None:
"""Forward config entry state events to websocket."""
connection.send_message(
websocket_api.event_message(
msg["id"], {"attributes": attributes, "state": state}
)
)
entity_id = options[CONF_ENTITY_ID]
entity_states: list[str] = options[CONF_STATE]
start: str | None = options.get(CONF_START)
end: str | None = options.get(CONF_END)
duration: timedelta | None = None
if duration_dict := options.get(CONF_DURATION):
duration = timedelta(**duration_dict)
if sum(param in options for param in CONF_PERIOD_KEYS) != 2:
return
history_stats = HistoryStats(
hass,
entity_id,
entity_states,
Template(start, hass) if start else None,
Template(end, hass) if end else None,
duration,
)
coordinator = HistoryStatsUpdateCoordinator(hass, history_stats, None, name)
await coordinator.async_refresh()
preview_entity = HistoryStatsSensor(
hass, coordinator, sensor_type, name, None, entity_id
)
preview_entity.hass = hass
connection.send_result(msg["id"])
connection.subscriptions[msg["id"]] = await preview_entity.async_start_preview(
async_preview_updated
)

View File

@ -3,6 +3,7 @@
from __future__ import annotations
from abc import abstractmethod
from collections.abc import Callable, Mapping
import datetime
from typing import Any
@ -23,7 +24,7 @@ from homeassistant.const import (
PERCENTAGE,
UnitOfTime,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import PlatformNotReady
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device import async_device_info_to_link_from_entity
@ -183,6 +184,7 @@ class HistoryStatsSensor(HistoryStatsSensorBase):
self._attr_native_unit_of_measurement = UNITS[sensor_type]
self._type = sensor_type
self._attr_unique_id = unique_id
self._source_entity_id = source_entity_id
self._attr_device_info = async_device_info_to_link_from_entity(
hass,
source_entity_id,
@ -192,6 +194,27 @@ class HistoryStatsSensor(HistoryStatsSensorBase):
self._attr_device_class = SensorDeviceClass.DURATION
self._attr_suggested_display_precision = 2
self._preview_callback: Callable[[str, Mapping[str, Any]], None] | None = None
async def async_start_preview(
self,
preview_callback: Callable[[str, Mapping[str, Any]], None],
) -> CALLBACK_TYPE:
"""Render a preview."""
# abort early if there is no entity_id
# as without we can't track changes
# or either size or max_age is not set
if not self._source_entity_id:
self._attr_available = False
calculated_state = self._async_calculate_state()
preview_callback(calculated_state.state, calculated_state.attributes)
return self._call_on_remove_callbacks
self._preview_callback = preview_callback
self._process_update()
return self._call_on_remove_callbacks
@callback
def _process_update(self) -> None:
"""Process an update from the coordinator."""