Add config flow to compensation helper

This commit is contained in:
G Johansson 2025-07-02 06:35:44 +00:00
parent 8330ae2d3a
commit 2f43b44443
9 changed files with 406 additions and 59 deletions

View File

@ -12,6 +12,7 @@ from homeassistant.components.sensor import (
DOMAIN as SENSOR_DOMAIN,
STATE_CLASSES_SCHEMA as SENSOR_STATE_CLASSES_SCHEMA,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_ATTRIBUTE,
CONF_DEVICE_CLASS,
@ -23,6 +24,7 @@ from homeassistant.const import (
CONF_UNIT_OF_MEASUREMENT,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.typing import ConfigType
@ -39,6 +41,7 @@ from .const import (
DEFAULT_DEGREE,
DEFAULT_PRECISION,
DOMAIN,
PLATFORMS,
)
_LOGGER = logging.getLogger(__name__)
@ -87,59 +90,103 @@ CONFIG_SCHEMA = vol.Schema(
)
async def create_compensation_data(
hass: HomeAssistant, compensation: str, conf: ConfigType, should_raise: bool = False
) -> None:
"""Create compensation data."""
_LOGGER.debug("Setup %s.%s", DOMAIN, compensation)
degree = conf[CONF_DEGREE]
initial_coefficients: list[tuple[float, float]] = conf[CONF_DATAPOINTS]
sorted_coefficients = sorted(initial_coefficients, key=itemgetter(0))
# get x values and y values from the x,y point pairs
x_values, y_values = zip(*initial_coefficients, strict=False)
# try to get valid coefficients for a polynomial
coefficients = None
with np.errstate(all="raise"):
try:
coefficients = np.polyfit(x_values, y_values, degree)
except FloatingPointError as error:
_LOGGER.error(
"Setup of %s encountered an error, %s",
compensation,
error,
)
if should_raise:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="setup_error",
translation_placeholders={
"title": conf[CONF_NAME],
"error": str(error),
},
) from error
if coefficients is not None:
data = {
k: v for k, v in conf.items() if k not in [CONF_DEGREE, CONF_DATAPOINTS]
}
data[CONF_POLYNOMIAL] = np.poly1d(coefficients)
if data[CONF_LOWER_LIMIT]:
data[CONF_MINIMUM] = sorted_coefficients[0]
else:
data[CONF_MINIMUM] = None
if data[CONF_UPPER_LIMIT]:
data[CONF_MAXIMUM] = sorted_coefficients[-1]
else:
data[CONF_MAXIMUM] = None
hass.data[DATA_COMPENSATION][compensation] = data
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Compensation sensor."""
hass.data[DATA_COMPENSATION] = {}
if DOMAIN not in config:
return True
for compensation, conf in config[DOMAIN].items():
_LOGGER.debug("Setup %s.%s", DOMAIN, compensation)
degree = conf[CONF_DEGREE]
initial_coefficients: list[tuple[float, float]] = conf[CONF_DATAPOINTS]
sorted_coefficients = sorted(initial_coefficients, key=itemgetter(0))
# get x values and y values from the x,y point pairs
x_values, y_values = zip(*initial_coefficients, strict=False)
# try to get valid coefficients for a polynomial
coefficients = None
with np.errstate(all="raise"):
try:
coefficients = np.polyfit(x_values, y_values, degree)
except FloatingPointError as error:
_LOGGER.error(
"Setup of %s encountered an error, %s",
compensation,
error,
)
if coefficients is not None:
data = {
k: v for k, v in conf.items() if k not in [CONF_DEGREE, CONF_DATAPOINTS]
}
data[CONF_POLYNOMIAL] = np.poly1d(coefficients)
if data[CONF_LOWER_LIMIT]:
data[CONF_MINIMUM] = sorted_coefficients[0]
else:
data[CONF_MINIMUM] = None
if data[CONF_UPPER_LIMIT]:
data[CONF_MAXIMUM] = sorted_coefficients[-1]
else:
data[CONF_MAXIMUM] = None
hass.data[DATA_COMPENSATION][compensation] = data
hass.async_create_task(
async_load_platform(
hass,
SENSOR_DOMAIN,
DOMAIN,
{CONF_COMPENSATION: compensation},
config,
)
await create_compensation_data(hass, compensation, conf)
hass.async_create_task(
async_load_platform(
hass,
SENSOR_DOMAIN,
DOMAIN,
{CONF_COMPENSATION: compensation},
config,
)
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Compensation from a config entry."""
config = dict(entry.options)
data_points = config[CONF_DATAPOINTS]
new_data_points = []
for data_point in data_points:
values = data_point.split(",", maxsplit=1)
new_data_points.append([float(values[0]), float(values[1])])
config[CONF_DATAPOINTS] = new_data_points
await create_compensation_data(hass, entry.entry_id, config, True)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(update_listener))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload Compensation config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)

View File

@ -0,0 +1,157 @@
"""Config flow for statistics."""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any, cast
import voluptuous as vol
from homeassistant.const import (
CONF_ATTRIBUTE,
CONF_ENTITY_ID,
CONF_NAME,
CONF_UNIT_OF_MEASUREMENT,
)
from homeassistant.helpers.schema_config_entry_flow import (
SchemaCommonFlowHandler,
SchemaConfigFlowHandler,
SchemaFlowError,
SchemaFlowFormStep,
)
from homeassistant.helpers.selector import (
AttributeSelector,
AttributeSelectorConfig,
BooleanSelector,
EntitySelector,
NumberSelector,
NumberSelectorConfig,
NumberSelectorMode,
ObjectSelector,
ObjectSelectorConfig,
ObjectSelectorField,
TextSelector,
)
from .const import (
CONF_DATAPOINTS,
CONF_DEGREE,
CONF_LOWER_LIMIT,
CONF_PRECISION,
CONF_UPPER_LIMIT,
DEFAULT_DEGREE,
DEFAULT_NAME,
DEFAULT_PRECISION,
DOMAIN,
)
async def get_options_schema(handler: SchemaCommonFlowHandler) -> vol.Schema:
"""Get options schema."""
entity_id = handler.options[CONF_ENTITY_ID]
return vol.Schema(
{
vol.Required(CONF_DATAPOINTS): ObjectSelector(
ObjectSelectorConfig(
label_field="uncompensated_value",
description_field="compensated_value",
multiple=True,
translation_key=CONF_DATAPOINTS,
fields={
"uncompensated_value": ObjectSelectorField(
required=True,
selector=TextSelector(),
),
"compensated_value": ObjectSelectorField(
required=True,
selector=TextSelector(),
),
},
)
),
vol.Optional(CONF_ATTRIBUTE): AttributeSelector(
AttributeSelectorConfig(entity_id=entity_id)
),
vol.Optional(CONF_UPPER_LIMIT, default=False): BooleanSelector(),
vol.Optional(CONF_LOWER_LIMIT, default=False): BooleanSelector(),
vol.Optional(CONF_PRECISION, default=DEFAULT_PRECISION): NumberSelector(
NumberSelectorConfig(min=0, step=1, mode=NumberSelectorMode.BOX)
),
vol.Optional(CONF_DEGREE, default=DEFAULT_DEGREE): NumberSelector(
NumberSelectorConfig(min=0, max=7, step=1, mode=NumberSelectorMode.BOX)
),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): TextSelector(),
}
)
def _is_valid_data_points(check_data_points: list[str]) -> bool:
"""Validate data points."""
result = False
for data_point in check_data_points:
if not data_point.find(",") > 0:
return False
values = data_point.split(",", maxsplit=1)
for value in values:
try:
float(value)
except ValueError:
return False
result = True
return result
async def validate_options(
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Validate options selected."""
user_input[CONF_PRECISION] = int(user_input[CONF_PRECISION])
user_input[CONF_DEGREE] = int(user_input[CONF_DEGREE])
if not _is_valid_data_points(user_input[CONF_DATAPOINTS]):
raise SchemaFlowError("incorrect_datapoints")
if len(user_input[CONF_DATAPOINTS]) <= user_input[CONF_DEGREE]:
raise SchemaFlowError("not_enough_datapoints")
handler.parent_handler._async_abort_entries_match({**handler.options, **user_input}) # noqa: SLF001
return user_input
DATA_SCHEMA_SETUP = vol.Schema(
{
vol.Required(CONF_NAME, default=DEFAULT_NAME): TextSelector(),
vol.Required(CONF_ENTITY_ID): EntitySelector(),
}
)
CONFIG_FLOW = {
"user": SchemaFlowFormStep(
schema=DATA_SCHEMA_SETUP,
next_step="options",
),
"options": SchemaFlowFormStep(
schema=get_options_schema,
validate_user_input=validate_options,
),
}
OPTIONS_FLOW = {
"init": SchemaFlowFormStep(
get_options_schema,
validate_user_input=validate_options,
),
}
class CompensationConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
"""Handle a config flow for Compensation."""
config_flow = CONFIG_FLOW
options_flow = OPTIONS_FLOW
def async_config_entry_title(self, options: Mapping[str, Any]) -> str:
"""Return config entry title."""
return cast(str, options[CONF_NAME])

View File

@ -1,6 +1,9 @@
"""Compensation constants."""
from homeassistant.const import Platform
DOMAIN = "compensation"
PLATFORMS = [Platform.SENSOR]
SENSOR = "compensation"

View File

@ -2,7 +2,9 @@
"domain": "compensation",
"name": "Compensation",
"codeowners": ["@Petro31"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/compensation",
"integration_type": "helper",
"iot_class": "calculated",
"quality_scale": "legacy",
"requirements": ["numpy==2.3.0"]

View File

@ -12,11 +12,13 @@ from homeassistant.components.sensor import (
CONF_STATE_CLASS,
SensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_UNIT_OF_MEASUREMENT,
CONF_ATTRIBUTE,
CONF_DEVICE_CLASS,
CONF_ENTITY_ID,
CONF_MAXIMUM,
CONF_MINIMUM,
CONF_NAME,
@ -33,7 +35,10 @@ from homeassistant.core import (
State,
callback,
)
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
AddEntitiesCallback,
)
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
@ -77,6 +82,36 @@ async def async_setup_platform(
)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Compensation sensor entry."""
compensation = entry.entry_id
conf: dict[str, Any] = hass.data[DATA_COMPENSATION][compensation]
source: str = conf[CONF_ENTITY_ID]
attribute: str | None = conf.get(CONF_ATTRIBUTE)
name = entry.title
async_add_entities(
[
CompensationSensor(
entry.entry_id,
name,
source,
attribute,
conf[CONF_PRECISION],
conf[CONF_POLYNOMIAL],
conf.get(CONF_UNIT_OF_MEASUREMENT),
conf[CONF_MINIMUM],
conf[CONF_MAXIMUM],
)
]
)
class CompensationSensor(SensorEntity):
"""Representation of a Compensation sensor."""

View File

@ -0,0 +1,90 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
},
"error": {
"incorrect_datapoints": "Datapoints needs to be provided in the right format, ex. '1.0, 0.0'.",
"not_enough_datapoints": "The number of datapoints needs to be more than the configured degree."
},
"step": {
"user": {
"description": "Add a compensation sensor",
"data": {
"name": "[%key:common::config_flow::data::name%]",
"entity_id": "Entity"
},
"data_description": {
"name": "Name for the created entity.",
"entity_id": "Entity to use as source."
}
},
"options": {
"description": "Read the documention for further details on how to configure the statistics sensor using these options.",
"data": {
"data_points": "Data points",
"attribute": "Attribute",
"upper_limit": "Upper limit",
"lower_limit": "Lower limit",
"precision": "Precision",
"degree": "Degree",
"unit_of_measurement": "Unit of measurement"
},
"data_description": {
"data_points": "Add a collection of data point conversions with uncompensated value and the compensated value. The number of required data point sets is equal to the polynomial degree + 1.",
"attribute": "Attribute from the source to monitor/compensate.",
"upper_limit": "Enables an upper limit for the sensor. The upper limit is defined by the data collections (data_points) greatest uncompensated value.",
"lower_limit": "Enables a lower limit for the sensor. The lower limit is defined by the data collections (data_points) lowest uncompensated value.",
"precision": "Defines the precision of the calculated values, through the argument of round().",
"degree": "The degree of a polynomial.",
"unit_of_measurement": "Defines the units of measurement of the sensor, if any."
}
}
}
},
"options": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]"
},
"error": {
"incorrect_datapoints": "[%key:component::compensation::config::error::incorrect_datapoints%]",
"not_enough_datapoints": "[%key:component::compensation::config::error::not_enough_datapoints%]"
},
"step": {
"init": {
"description": "[%key:component::compensation::config::step::options::description%]",
"data": {
"data_points": "[%key:component::compensation::config::step::options::data::data_points%]",
"attribute": "[%key:component::compensation::config::step::options::data::attribute%]",
"upper_limit": "[%key:component::compensation::config::step::options::data::upper_limit%]",
"lower_limit": "[%key:component::compensation::config::step::options::data::lower_limit%]",
"precision": "[%key:component::compensation::config::step::options::data::precision%]",
"degree": "[%key:component::compensation::config::step::options::data::degree%]",
"unit_of_measurement": "[%key:component::compensation::config::step::options::data::unit_of_measurement%]"
},
"data_description": {
"data_points": "[%key:component::compensation::config::step::options::data_description::data_points%]",
"attribute": "[%key:component::compensation::config::step::options::data_description::attribute%]",
"upper_limit": "[%key:component::compensation::config::step::options::data_description::upper_limit%]",
"lower_limit": "[%key:component::compensation::config::step::options::data_description::lower_limit%]",
"precision": "[%key:component::compensation::config::step::options::data_description::precision%]",
"degree": "[%key:component::compensation::config::step::options::data_description::degree%]",
"unit_of_measurement": "[%key:component::compensation::config::step::options::data_description::unit_of_measurement%]"
}
}
}
},
"selector": {
"answers": {
"fields": {
"uncompensated_value": "Uncompensated value",
"compensated_value": "Compensated value"
}
}
},
"exceptions": {
"setup_error": {
"message": "Setup of {title} could not be setup due to {error}"
}
}
}

View File

@ -5,6 +5,7 @@ To update, run python3 -m script.hassfest
FLOWS = {
"helper": [
"compensation",
"derivative",
"filter",
"generic_hygrostat",

View File

@ -1072,12 +1072,6 @@
"config_flow": false,
"iot_class": "local_polling"
},
"compensation": {
"name": "Compensation",
"integration_type": "hub",
"config_flow": false,
"iot_class": "calculated"
},
"concord232": {
"name": "Concord232",
"integration_type": "hub",
@ -7733,6 +7727,12 @@
"config_flow": false,
"iot_class": "local_polling"
},
"compensation": {
"name": "Compensation",
"integration_type": "helper",
"config_flow": true,
"iot_class": "calculated"
},
"counter": {
"integration_type": "helper",
"config_flow": false

View File

@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Callable, Mapping, Sequence
from copy import deepcopy
from enum import StrEnum
from functools import cache
import importlib
@ -1128,12 +1129,12 @@ class NumberSelector(Selector[NumberSelectorConfig]):
return value
class ObjectSelectorField(TypedDict):
class ObjectSelectorField(TypedDict, total=False):
"""Class to represent an object selector fields dict."""
label: str
required: bool
selector: dict[str, Any]
selector: Required[Selector | dict]
class ObjectSelectorConfig(BaseSelectorConfig):
@ -1142,7 +1143,7 @@ class ObjectSelectorConfig(BaseSelectorConfig):
fields: dict[str, ObjectSelectorField]
multiple: bool
label_field: str
description_field: bool
description_field: str
translation_key: str
@ -1156,7 +1157,7 @@ class ObjectSelector(Selector[ObjectSelectorConfig]):
{
vol.Optional("fields"): {
str: {
vol.Required("selector"): dict,
vol.Required("selector"): vol.Any(Selector, dict),
vol.Optional("required"): bool,
vol.Optional("label"): str,
}
@ -1172,6 +1173,17 @@ class ObjectSelector(Selector[ObjectSelectorConfig]):
"""Instantiate a selector."""
super().__init__(config)
def serialize(self) -> dict[str, dict[str, ObjectSelectorConfig]]:
"""Serialize Selector for voluptuous_serialize."""
_config = deepcopy(self.config)
if "fields" in _config:
for items in _config["fields"].values():
if isinstance(items["selector"], Selector):
items["selector"] = {
items["selector"].selector_type: items["selector"].config
}
return {"selector": {self.selector_type: _config}}
def __call__(self, data: Any) -> Any:
"""Validate the passed selection."""
return data