Add time based integration trigger to Riemann sum integral helper sensor (#110685)

* Schedule max dt for Riemann Integral sensor

* Simplify validation. Dont integrate on change if either old or new state is not numeric.

* Add validation to integration methods. Rollback requirement for both states to be always numeric.

* Use 0 max_dt for disabling time based updates.

* Use docstring instead of pass keyword in abstract methods.

* Use time_period config validation for max_dt

* Use new_state for scheduling max_dt. Only schedule if new state is numeric.

* Use default 0 (None) for max_dt.

* Rename max_dt to max_age.

* Rollback accidental renaming of different file

* Remove unnecessary and nonsensical max value.

* Improve new config description

* Use DurationSelector in config flow

* Rename new config to max_sub_interval

* Simplify by checking once for the integration strategy

* Use positive time period validation of sub interval in platform schema

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* Remove return keyword

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* Simplify scheduling of interval exceeded callback

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* Improve documentation

* Be more clear about when time based integration is disabled.

* Update homeassistant/components/integration/config_flow.py

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
Ron Weikamp 2024-05-30 20:40:23 +02:00 committed by GitHub
parent 4d27dd0fb0
commit 43c69c71c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 279 additions and 6 deletions

View File

@ -26,6 +26,7 @@ from homeassistant.helpers.schema_config_entry_flow import (
)
from .const import (
CONF_MAX_SUB_INTERVAL,
CONF_ROUND_DIGITS,
CONF_SOURCE_SENSOR,
CONF_UNIT_PREFIX,
@ -100,6 +101,9 @@ async def _get_options_dict(handler: SchemaCommonFlowHandler | None) -> dict:
min=0, max=6, mode=selector.NumberSelectorMode.BOX
),
),
vol.Optional(CONF_MAX_SUB_INTERVAL): selector.DurationSelector(
selector.DurationSelectorConfig(allow_negative=False)
),
}

View File

@ -7,6 +7,7 @@ CONF_SOURCE_SENSOR = "source"
CONF_UNIT_OF_MEASUREMENT = "unit"
CONF_UNIT_PREFIX = "unit_prefix"
CONF_UNIT_TIME = "unit_time"
CONF_MAX_SUB_INTERVAL = "max_sub_interval"
METHOD_TRAPEZOIDAL = "trapezoidal"
METHOD_LEFT = "left"

View File

@ -4,7 +4,9 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from decimal import Decimal, DecimalException, InvalidOperation
from enum import Enum
import logging
from typing import Any, Final, Self
@ -29,6 +31,7 @@ from homeassistant.const import (
UnitOfTime,
)
from homeassistant.core import (
CALLBACK_TYPE,
Event,
EventStateChangedData,
HomeAssistant,
@ -42,10 +45,11 @@ from homeassistant.helpers import (
)
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.event import async_call_later, async_track_state_change_event
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import (
CONF_MAX_SUB_INTERVAL,
CONF_ROUND_DIGITS,
CONF_SOURCE_SENSOR,
CONF_UNIT_OF_MEASUREMENT,
@ -87,6 +91,7 @@ PLATFORM_SCHEMA = vol.All(
vol.Optional(CONF_UNIT_PREFIX): vol.In(UNIT_PREFIXES),
vol.Optional(CONF_UNIT_TIME, default=UnitOfTime.HOURS): vol.In(UNIT_TIME),
vol.Remove(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_MAX_SUB_INTERVAL): cv.positive_time_period,
vol.Optional(CONF_METHOD, default=METHOD_TRAPEZOIDAL): vol.In(
INTEGRATION_METHODS
),
@ -176,6 +181,11 @@ _NAME_TO_INTEGRATION_METHOD: dict[str, type[_IntegrationMethod]] = {
}
class _IntegrationTrigger(Enum):
StateChange = "state_change"
TimeElapsed = "time_elapsed"
@dataclass
class IntegrationSensorExtraStoredData(SensorExtraStoredData):
"""Object to hold extra stored data."""
@ -261,6 +271,11 @@ async def async_setup_entry(
# Before we had support for optional selectors, "none" was used for selecting nothing
unit_prefix = None
if max_sub_interval_dict := config_entry.options.get(CONF_MAX_SUB_INTERVAL, None):
max_sub_interval = cv.time_period(max_sub_interval_dict)
else:
max_sub_interval = None
round_digits = config_entry.options.get(CONF_ROUND_DIGITS)
if round_digits:
round_digits = int(round_digits)
@ -274,6 +289,7 @@ async def async_setup_entry(
unit_prefix=unit_prefix,
unit_time=config_entry.options[CONF_UNIT_TIME],
device_info=device_info,
max_sub_interval=max_sub_interval,
)
async_add_entities([integral])
@ -294,6 +310,7 @@ async def async_setup_platform(
unique_id=config.get(CONF_UNIQUE_ID),
unit_prefix=config.get(CONF_UNIT_PREFIX),
unit_time=config[CONF_UNIT_TIME],
max_sub_interval=config.get(CONF_MAX_SUB_INTERVAL),
)
async_add_entities([integral])
@ -315,6 +332,7 @@ class IntegrationSensor(RestoreSensor):
unique_id: str | None,
unit_prefix: str | None,
unit_time: UnitOfTime,
max_sub_interval: timedelta | None,
device_info: DeviceInfo | None = None,
) -> None:
"""Initialize the integration sensor."""
@ -334,6 +352,14 @@ class IntegrationSensor(RestoreSensor):
self._source_entity: str = source_entity
self._last_valid_state: Decimal | None = None
self._attr_device_info = device_info
self._max_sub_interval: timedelta | None = (
None # disable time based integration
if max_sub_interval is None or max_sub_interval.total_seconds() == 0
else max_sub_interval
)
self._max_sub_interval_exceeded_callback: CALLBACK_TYPE = lambda *args: None
self._last_integration_time: datetime = datetime.now(tz=UTC)
self._last_integration_trigger = _IntegrationTrigger.StateChange
self._attr_suggested_display_precision = round_digits or 2
def _calculate_unit(self, source_unit: str) -> str:
@ -421,19 +447,55 @@ class IntegrationSensor(RestoreSensor):
self._attr_device_class = state.attributes.get(ATTR_DEVICE_CLASS)
self._unit_of_measurement = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if self._max_sub_interval is not None:
source_state = self.hass.states.get(self._sensor_source_id)
self._schedule_max_sub_interval_exceeded_if_state_is_numeric(source_state)
self.async_on_remove(self._cancel_max_sub_interval_exceeded_callback)
handle_state_change = self._integrate_on_state_change_and_max_sub_interval
else:
handle_state_change = self._integrate_on_state_change_callback
self.async_on_remove(
async_track_state_change_event(
self.hass,
[self._sensor_source_id],
self._handle_state_change,
handle_state_change,
)
)
@callback
def _handle_state_change(self, event: Event[EventStateChangedData]) -> None:
def _integrate_on_state_change_and_max_sub_interval(
self, event: Event[EventStateChangedData]
) -> None:
"""Integrate based on state change and time.
Next to doing the integration based on state change this method cancels and
reschedules time based integration.
"""
self._cancel_max_sub_interval_exceeded_callback()
old_state = event.data["old_state"]
new_state = event.data["new_state"]
try:
self._integrate_on_state_change(old_state, new_state)
self._last_integration_trigger = _IntegrationTrigger.StateChange
self._last_integration_time = datetime.now(tz=UTC)
finally:
# When max_sub_interval exceeds without state change the source is assumed
# constant with the last known state (new_state).
self._schedule_max_sub_interval_exceeded_if_state_is_numeric(new_state)
@callback
def _integrate_on_state_change_callback(
self, event: Event[EventStateChangedData]
) -> None:
"""Handle the sensor state changes."""
old_state = event.data["old_state"]
new_state = event.data["new_state"]
return self._integrate_on_state_change(old_state, new_state)
def _integrate_on_state_change(
self, old_state: State | None, new_state: State | None
) -> None:
if old_state is None or new_state is None:
return
@ -451,6 +513,8 @@ class IntegrationSensor(RestoreSensor):
elapsed_seconds = Decimal(
(new_state.last_updated - old_state.last_updated).total_seconds()
if self._last_integration_trigger == _IntegrationTrigger.StateChange
else (new_state.last_updated - self._last_integration_time).total_seconds()
)
area = self._method.calculate_area_with_two_states(elapsed_seconds, *states)
@ -458,6 +522,52 @@ class IntegrationSensor(RestoreSensor):
self._update_integral(area)
self.async_write_ha_state()
def _schedule_max_sub_interval_exceeded_if_state_is_numeric(
self, source_state: State | None
) -> None:
"""Schedule possible integration using the source state and max_sub_interval.
The callback reference is stored for possible cancellation if the source state
reports a change before max_sub_interval has passed.
If the callback is executed, meaning there was no state change reported, the
source_state is assumed constant and integration is done using its value.
"""
if (
self._max_sub_interval is not None
and source_state is not None
and (source_state_dec := _decimal_state(source_state.state))
):
@callback
def _integrate_on_max_sub_interval_exceeded_callback(now: datetime) -> None:
"""Integrate based on time and reschedule."""
elapsed_seconds = Decimal(
(now - self._last_integration_time).total_seconds()
)
self._derive_and_set_attributes_from_state(source_state)
area = self._method.calculate_area_with_one_state(
elapsed_seconds, source_state_dec
)
self._update_integral(area)
self.async_write_ha_state()
self._last_integration_time = datetime.now(tz=UTC)
self._last_integration_trigger = _IntegrationTrigger.TimeElapsed
self._schedule_max_sub_interval_exceeded_if_state_is_numeric(
source_state
)
self._max_sub_interval_exceeded_callback = async_call_later(
self.hass,
self._max_sub_interval,
_integrate_on_max_sub_interval_exceeded_callback,
)
def _cancel_max_sub_interval_exceeded_callback(self) -> None:
self._max_sub_interval_exceeded_callback()
@property
def native_value(self) -> Decimal | None:
"""Return the state of the sensor."""

View File

@ -11,12 +11,14 @@
"round": "Precision",
"source": "Input sensor",
"unit_prefix": "Metric prefix",
"unit_time": "Time unit"
"unit_time": "Time unit",
"max_sub_interval": "Max sub-interval"
},
"data_description": {
"round": "Controls the number of decimal digits in the output.",
"unit_prefix": "The output will be scaled according to the selected metric prefix.",
"unit_time": "The output will be scaled according to the selected time unit."
"unit_time": "The output will be scaled according to the selected time unit.",
"max_sub_interval": "Applies time based integration if the source did not change for this duration. Use 0 for no time based updates."
}
}
}

View File

@ -36,6 +36,7 @@ async def test_config_flow(hass: HomeAssistant, platform) -> None:
"round": 1,
"source": input_sensor_entity_id,
"unit_time": "min",
"max_sub_interval": {"seconds": 0},
},
)
await hass.async_block_till_done()
@ -49,6 +50,7 @@ async def test_config_flow(hass: HomeAssistant, platform) -> None:
"round": 1.0,
"source": "sensor.input",
"unit_time": "min",
"max_sub_interval": {"seconds": 0},
}
assert len(mock_setup_entry.mock_calls) == 1
@ -60,6 +62,7 @@ async def test_config_flow(hass: HomeAssistant, platform) -> None:
"round": 1.0,
"source": "sensor.input",
"unit_time": "min",
"max_sub_interval": {"seconds": 0},
}
assert config_entry.title == "My integration"
@ -89,6 +92,7 @@ async def test_options(hass: HomeAssistant, platform) -> None:
"source": "sensor.input",
"unit_prefix": "k",
"unit_time": "min",
"max_sub_interval": {"minutes": 1},
},
title="My integration",
)
@ -119,6 +123,7 @@ async def test_options(hass: HomeAssistant, platform) -> None:
"method": "right",
"round": 2.0,
"source": "sensor.input",
"max_sub_interval": {"minutes": 1},
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
@ -129,6 +134,7 @@ async def test_options(hass: HomeAssistant, platform) -> None:
"source": "sensor.input",
"unit_prefix": "k",
"unit_time": "min",
"max_sub_interval": {"minutes": 1},
}
assert config_entry.data == {}
assert config_entry.options == {
@ -138,6 +144,7 @@ async def test_options(hass: HomeAssistant, platform) -> None:
"source": "sensor.input",
"unit_prefix": "k",
"unit_time": "min",
"max_sub_interval": {"minutes": 1},
}
assert config_entry.title == "My integration"

View File

@ -30,6 +30,7 @@ async def test_setup_and_remove_config_entry(
"source": "sensor.input",
"unit_prefix": "k",
"unit_time": "min",
"max_sub_interval": {"minutes": 1},
},
title="My integration",
)

View File

@ -18,12 +18,17 @@ from homeassistant.const import (
UnitOfTime,
)
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers import (
condition,
device_registry as dr,
entity_registry as er,
)
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import (
MockConfigEntry,
async_fire_time_changed,
mock_restore_cache,
mock_restore_cache_with_extra_data,
)
@ -745,3 +750,146 @@ async def test_device_id(
integration_entity = entity_registry.async_get("sensor.integration")
assert integration_entity is not None
assert integration_entity.device_id == source_entity.device_id
def _integral_sensor_config(max_sub_interval: dict[str, int] | None = {"minutes": 1}):
sensor = {
"platform": "integration",
"name": "integration",
"source": "sensor.power",
"method": "right",
}
if max_sub_interval is not None:
sensor["max_sub_interval"] = max_sub_interval
return {"sensor": sensor}
async def _setup_integral_sensor(
hass: HomeAssistant, max_sub_interval: dict[str, int] | None = {"minutes": 1}
) -> None:
await async_setup_component(
hass, "sensor", _integral_sensor_config(max_sub_interval=max_sub_interval)
)
await hass.async_block_till_done()
async def _update_source_sensor(hass: HomeAssistant, value: int | str) -> None:
hass.states.async_set(
_integral_sensor_config()["sensor"]["source"],
value,
{ATTR_UNIT_OF_MEASUREMENT: UnitOfPower.KILO_WATT},
force_update=True,
)
await hass.async_block_till_done()
async def test_on_valid_source_expect_update_on_time(
hass: HomeAssistant,
) -> None:
"""Test whether time based integration updates the integral on a valid source."""
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
await _setup_integral_sensor(hass)
await _update_source_sensor(hass, 100)
state_before_max_sub_interval_exceeded = hass.states.get("sensor.integration")
freezer.tick(61)
async_fire_time_changed(hass, dt_util.now())
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert (
condition.async_numeric_state(hass, state_before_max_sub_interval_exceeded)
is False
)
assert state_before_max_sub_interval_exceeded.state != state.state
assert condition.async_numeric_state(hass, state) is True
assert float(state.state) > 1.69 # approximately 100 * 61 / 3600
assert float(state.state) < 1.8
async def test_on_unvailable_source_expect_no_update_on_time(
hass: HomeAssistant,
) -> None:
"""Test whether time based integration handles unavailability of the source properly."""
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
await _setup_integral_sensor(hass)
await _update_source_sensor(hass, 100)
freezer.tick(61)
async_fire_time_changed(hass, dt_util.now())
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert condition.async_numeric_state(hass, state) is True
await _update_source_sensor(hass, STATE_UNAVAILABLE)
await hass.async_block_till_done()
freezer.tick(61)
async_fire_time_changed(hass, dt_util.now())
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert condition.state(hass, state, STATE_UNAVAILABLE) is True
async def test_on_statechanges_source_expect_no_update_on_time(
hass: HomeAssistant,
) -> None:
"""Test whether state changes cancel time based integration."""
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
await _setup_integral_sensor(hass)
await _update_source_sensor(hass, 100)
freezer.tick(30)
await hass.async_block_till_done()
await _update_source_sensor(hass, 101)
state_after_30s = hass.states.get("sensor.integration")
assert condition.async_numeric_state(hass, state_after_30s) is True
freezer.tick(35)
async_fire_time_changed(hass, dt_util.now())
await hass.async_block_till_done()
state_after_65s = hass.states.get("sensor.integration")
assert (dt_util.now() - start_time).total_seconds() > 60
# No state change because the timer was cancelled because of an update after 30s
assert state_after_65s == state_after_30s
freezer.tick(35)
async_fire_time_changed(hass, dt_util.now())
await hass.async_block_till_done()
state_after_105s = hass.states.get("sensor.integration")
# Update based on time
assert float(state_after_105s.state) > float(state_after_65s.state)
async def test_on_no_max_sub_interval_expect_no_timebased_updates(
hass: HomeAssistant,
) -> None:
"""Test whether integratal is not updated by time when max_sub_interval is not configured."""
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
await _setup_integral_sensor(hass, max_sub_interval=None)
await _update_source_sensor(hass, 100)
await hass.async_block_till_done()
await _update_source_sensor(hass, 101)
await hass.async_block_till_done()
state_after_last_state_change = hass.states.get("sensor.integration")
assert (
condition.async_numeric_state(hass, state_after_last_state_change) is True
)
freezer.tick(100)
async_fire_time_changed(hass, dt_util.now())
await hass.async_block_till_done()
state_after_100s = hass.states.get("sensor.integration")
assert state_after_100s == state_after_last_state_change