Tweak integration sensor (#114384)

* Tweak integration sensor

* Improve tests
This commit is contained in:
Erik Montnemery 2024-04-02 09:19:25 +02:00 committed by GitHub
parent 667e119d32
commit 0030c97f59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 65 additions and 33 deletions

View File

@ -30,7 +30,6 @@ from homeassistant.const import (
)
from homeassistant.core import Event, HomeAssistant, State, callback
from homeassistant.helpers import (
condition,
config_validation as cv,
device_registry as dr,
entity_registry as er,
@ -97,57 +96,72 @@ class _IntegrationMethod(ABC):
return _NAME_TO_INTEGRATION_METHOD[method_name]()
@abstractmethod
def validate_states(self, left: State, right: State) -> bool:
def validate_states(
self, left: State, right: State
) -> tuple[Decimal, Decimal] | None:
"""Check state requirements for integration."""
@abstractmethod
def calculate_area_with_two_states(
self, elapsed_time: float, left: State, right: State
self, elapsed_time: Decimal, left: Decimal, right: Decimal
) -> Decimal:
"""Calculate area given two states."""
def calculate_area_with_one_state(
self, elapsed_time: float, constant_state: State
self, elapsed_time: Decimal, constant_state: Decimal
) -> Decimal:
return Decimal(constant_state.state) * Decimal(elapsed_time)
return constant_state * elapsed_time
class _Trapezoidal(_IntegrationMethod):
def calculate_area_with_two_states(
self, elapsed_time: float, left: State, right: State
self, elapsed_time: Decimal, left: Decimal, right: Decimal
) -> Decimal:
return Decimal(elapsed_time) * (Decimal(left.state) + Decimal(right.state)) / 2
return elapsed_time * (left + right) / 2
def validate_states(self, left: State, right: State) -> bool:
return _is_numeric_state(left) and _is_numeric_state(right)
def validate_states(
self, left: State, right: State
) -> tuple[Decimal, Decimal] | None:
if (left_dec := _decimal_state(left.state)) is None or (
right_dec := _decimal_state(right.state)
) is None:
return None
return (left_dec, right_dec)
class _Left(_IntegrationMethod):
def calculate_area_with_two_states(
self, elapsed_time: float, left: State, right: State
self, elapsed_time: Decimal, left: Decimal, right: Decimal
) -> Decimal:
return self.calculate_area_with_one_state(elapsed_time, left)
def validate_states(self, left: State, right: State) -> bool:
return _is_numeric_state(left)
def validate_states(
self, left: State, right: State
) -> tuple[Decimal, Decimal] | None:
if (left_dec := _decimal_state(left.state)) is None:
return None
return (left_dec, left_dec)
class _Right(_IntegrationMethod):
def calculate_area_with_two_states(
self, elapsed_time: float, left: State, right: State
self, elapsed_time: Decimal, left: Decimal, right: Decimal
) -> Decimal:
return self.calculate_area_with_one_state(elapsed_time, right)
def validate_states(self, left: State, right: State) -> bool:
return _is_numeric_state(right)
def validate_states(
self, left: State, right: State
) -> tuple[Decimal, Decimal] | None:
if (right_dec := _decimal_state(right.state)) is None:
return None
return (right_dec, right_dec)
def _is_numeric_state(state: State) -> bool:
def _decimal_state(state: str) -> Decimal | None:
try:
float(state.state)
except (ValueError, TypeError):
return False
return True
return Decimal(state)
except (InvalidOperation, TypeError):
return None
_NAME_TO_INTEGRATION_METHOD: dict[str, type[_IntegrationMethod]] = {
@ -413,7 +427,7 @@ class IntegrationSensor(RestoreSensor):
if old_state is None or new_state is None:
return
if condition.state(self.hass, new_state, [STATE_UNAVAILABLE]):
if new_state.state == STATE_UNAVAILABLE:
self._attr_available = False
self.async_write_ha_state()
return
@ -421,18 +435,16 @@ class IntegrationSensor(RestoreSensor):
self._attr_available = True
self._derive_and_set_attributes_from_state(new_state)
if not self._method.validate_states(old_state, new_state):
if not (states := self._method.validate_states(old_state, new_state)):
self.async_write_ha_state()
return
elapsed_seconds = (
new_state.last_updated - old_state.last_updated
).total_seconds()
area = self._method.calculate_area_with_two_states(
elapsed_seconds, old_state, new_state
elapsed_seconds = Decimal(
(new_state.last_updated - old_state.last_updated).total_seconds()
)
area = self._method.calculate_area_with_two_states(elapsed_seconds, *states)
self._update_integral(area)
self.async_write_ha_state()

View File

@ -629,8 +629,17 @@ async def test_device_class(hass: HomeAssistant, method) -> None:
assert state.attributes.get("device_class") == SensorDeviceClass.ENERGY
@pytest.mark.parametrize("method", ["trapezoidal", "left", "right"])
async def test_calc_errors(hass: HomeAssistant, method) -> None:
@pytest.mark.parametrize(
("method", "expected_states"),
[
("trapezoidal", [STATE_UNKNOWN, "0.500", "0.500"]),
("left", [STATE_UNKNOWN, "0.000", "1.000"]),
("right", ["0.000", "1.000", "1.000"]),
],
)
async def test_calc_errors(
hass: HomeAssistant, method: str, expected_states: list[str]
) -> None:
"""Test integration sensor units using a power source."""
config = {
"sensor": {
@ -649,9 +658,9 @@ async def test_calc_errors(hass: HomeAssistant, method) -> None:
hass.states.async_set(entity_id, None, {})
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
# With the source sensor in a None state, the Reimann sensor should be
# unknown
state = hass.states.get("sensor.integration")
assert state is not None
assert state.state == STATE_UNKNOWN
@ -665,7 +674,7 @@ async def test_calc_errors(hass: HomeAssistant, method) -> None:
state = hass.states.get("sensor.integration")
assert state is not None
assert state.state == STATE_UNKNOWN if method != "right" else "0.000"
assert state.state == expected_states[0]
# With the source sensor updated successfully, the Reimann sensor
# should have a zero (known) value.
@ -677,7 +686,18 @@ async def test_calc_errors(hass: HomeAssistant, method) -> None:
state = hass.states.get("sensor.integration")
assert state is not None
assert round(float(state.state)) == 0 if method != "right" else 1
assert state.state == expected_states[1]
# Set the source sensor back to a non numeric state
now += timedelta(seconds=3600)
with freeze_time(now):
hass.states.async_set(entity_id, "unexpected", {"device_class": None})
await hass.async_block_till_done()
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert state is not None
assert state.state == expected_states[2]
async def test_device_id(