Improve integration sensor tests (#149180)

This commit is contained in:
Erik Montnemery 2025-07-21 18:23:58 +02:00 committed by GitHub
parent 7d895653fb
commit 941d3c2be4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -21,12 +21,19 @@ from homeassistant.const import (
UnitOfTime,
UnitOfVolumeFlowRate,
)
from homeassistant.core import HomeAssistant, State
from homeassistant.core import (
Event,
EventStateChangedData,
HomeAssistant,
State,
callback,
)
from homeassistant.helpers import (
condition,
device_registry as dr,
entity_registry as er,
)
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
@ -297,24 +304,32 @@ async def test_restore_state_failed(hass: HomeAssistant, extra_attributes) -> No
@pytest.mark.parametrize("extra_config", [{}, {"max_sub_interval": {"minutes": 9999}}])
@pytest.mark.parametrize("force_update", [False, True])
@pytest.mark.parametrize(
"sequence",
("sequence", "expected_states"),
[
# time, value, attributes, expected
# time, value, attributes
(
(0, 0, {}, 0),
(20, 10, {}, 1.67),
(30, 30, {}, 5.0),
(40, 5, {}, 7.92),
(50, 5, {}, 8.75), # This fires a state report
(60, 0, {}, 9.17),
(
(0, 0, {}),
(20, 10, {}),
(30, 30, {}),
(40, 5, {}),
(50, 5, {}), # This fires a state report
(60, 5, {}), # This fires a state report
(70, 0, {}),
),
(0, 1.67, 5.0, 7.92, 8.75, 9.58, 10.0),
),
(
(0, 0, {}, 0),
(20, 10, {}, 1.67),
(30, 30, {}, 5.0),
(40, 5, {}, 7.92),
(50, 5, {"foo": "bar"}, 8.75), # This fires a state change
(60, 0, {}, 9.17),
(
(0, 0, {}),
(20, 10, {}),
(30, 30, {}),
(40, 5, {}),
(50, 5, {"foo": "bar"}), # This fires a state change
(60, 5, {"foo": "baz"}), # This fires a state change
(70, 0, {}),
),
(0, 1.67, 5.0, 7.92, 8.75, 9.58, 10.0),
),
],
)
@ -323,8 +338,17 @@ async def test_trapezoidal(
sequence: tuple[tuple[float, float, dict[str, Any], float], ...],
force_update: bool,
extra_config: dict[str, Any],
expected_states: tuple[float, ...],
) -> None:
"""Test integration sensor state."""
events: list[Event[EventStateChangedData]] = []
@callback
def _capture_event(event: Event) -> None:
events.append(event)
async_track_state_change_event(hass, "sensor.integration", _capture_event)
config = {
"sensor": {
"platform": "integration",
@ -349,7 +373,7 @@ async def test_trapezoidal(
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
# Testing a power sensor with non-monotonic intervals and values
for time, value, extra_attributes, expected in sequence:
for time, value, extra_attributes in sequence:
freezer.move_to(start_time + timedelta(minutes=time))
hass.states.async_set(
entity_id,
@ -357,32 +381,45 @@ async def test_trapezoidal(
{ATTR_UNIT_OF_MEASUREMENT: UnitOfPower.KILO_WATT} | extra_attributes,
force_update=force_update,
)
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert round(float(state.state), config["sensor"]["round"]) == expected
await hass.async_block_till_done()
await hass.async_block_till_done()
states = [events[0].data["new_state"].state] + [
round(float(event.data["new_state"].state), config["sensor"]["round"])
for event in events[1:]
]
assert states == ["unknown", *expected_states]
state = events[-1].data["new_state"]
assert state.attributes.get("unit_of_measurement") == UnitOfEnergy.KILO_WATT_HOUR
@pytest.mark.parametrize("extra_config", [{}, {"max_sub_interval": {"minutes": 9999}}])
@pytest.mark.parametrize("force_update", [False, True])
@pytest.mark.parametrize(
"sequence",
("sequence", "expected_states"),
[
# time, value, attributes, expected
(
(20, 10, {}, 0.0),
(30, 30, {}, 1.67),
(40, 5, {}, 6.67),
(50, 5, {}, 7.5), # This fires a state report
(60, 0, {}, 8.33),
( # time, value, attributes, expected
(
(20, 10, {}),
(30, 30, {}),
(40, 5, {}),
(50, 5, {}), # This fires a state report
(60, 5, {}), # This fires a state report
(70, 0, {}),
),
(0, 1.67, 6.67, 7.5, 8.33, 9.17),
),
(
(20, 10, {}, 0.0),
(30, 30, {}, 1.67),
(40, 5, {}, 6.67),
(50, 5, {"foo": "bar"}, 7.5), # This fires a state change
(60, 0, {}, 8.33),
(
(20, 10, {}),
(30, 30, {}),
(40, 5, {}),
(50, 5, {"foo": "bar"}),
(60, 5, {"foo": "baz"}),
(70, 0, {}),
),
(0, 1.67, 6.67, 7.5, 8.33, 9.17),
),
],
)
@ -391,8 +428,17 @@ async def test_left(
sequence: tuple[tuple[float, float, dict[str, Any], float], ...],
force_update: bool,
extra_config: dict[str, Any],
expected_states: tuple[float, ...],
) -> None:
"""Test integration sensor state with left Riemann method."""
events: list[Event[EventStateChangedData]] = []
@callback
def _capture_event(event: Event) -> None:
events.append(event)
async_track_state_change_event(hass, "sensor.integration", _capture_event)
config = {
"sensor": {
"platform": "integration",
@ -420,7 +466,7 @@ async def test_left(
# Testing a power sensor with non-monotonic intervals and values
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
for time, value, extra_attributes, expected in sequence:
for time, value, extra_attributes in sequence:
freezer.move_to(start_time + timedelta(minutes=time))
hass.states.async_set(
entity_id,
@ -428,31 +474,50 @@ async def test_left(
{ATTR_UNIT_OF_MEASUREMENT: UnitOfPower.KILO_WATT} | extra_attributes,
force_update=force_update,
)
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert round(float(state.state), config["sensor"]["round"]) == expected
await hass.async_block_till_done()
await hass.async_block_till_done()
states = (
[events[0].data["new_state"].state]
+ [events[1].data["new_state"].state]
+ [
round(float(event.data["new_state"].state), config["sensor"]["round"])
for event in events[2:]
]
)
assert states == ["unknown", "unknown", *expected_states]
state = events[-1].data["new_state"]
assert state.attributes.get("unit_of_measurement") == UnitOfEnergy.KILO_WATT_HOUR
@pytest.mark.parametrize("extra_config", [{}, {"max_sub_interval": {"minutes": 9999}}])
@pytest.mark.parametrize("force_update", [False, True])
@pytest.mark.parametrize(
"sequence",
("sequence", "expected_states"),
[
# time, value, attributes, expected
(
(20, 10, {}, 3.33),
(30, 30, {}, 8.33),
(40, 5, {}, 9.17),
(50, 5, {}, 10.0), # This fires a state report
(60, 0, {}, 10.0),
(
(20, 10, {}),
(30, 30, {}),
(40, 5, {}),
(50, 5, {}), # This fires a state report
(60, 5, {}), # This fires a state report
(70, 0, {}),
),
(3.33, 8.33, 9.17, 10.0, 10.83),
),
(
(20, 10, {}, 3.33),
(30, 30, {}, 8.33),
(40, 5, {}, 9.17),
(50, 5, {"foo": "bar"}, 10.0), # This fires a state change
(60, 0, {}, 10.0),
(
(20, 10, {}),
(30, 30, {}),
(40, 5, {}),
(50, 5, {"foo": "bar"}), # This fires a state change
(60, 5, {"foo": "baz"}), # This fires a state change
(70, 0, {}),
),
(3.33, 8.33, 9.17, 10.0, 10.83),
),
],
)
@ -461,8 +526,17 @@ async def test_right(
sequence: tuple[tuple[float, float, dict[str, Any], float], ...],
force_update: bool,
extra_config: dict[str, Any],
expected_states: tuple[float, ...],
) -> None:
"""Test integration sensor state with right Riemann method."""
events: list[Event[EventStateChangedData]] = []
@callback
def _capture_event(event: Event) -> None:
events.append(event)
async_track_state_change_event(hass, "sensor.integration", _capture_event)
config = {
"sensor": {
"platform": "integration",
@ -490,7 +564,7 @@ async def test_right(
# Testing a power sensor with non-monotonic intervals and values
start_time = dt_util.utcnow()
with freeze_time(start_time) as freezer:
for time, value, extra_attributes, expected in sequence:
for time, value, extra_attributes in sequence:
freezer.move_to(start_time + timedelta(minutes=time))
hass.states.async_set(
entity_id,
@ -498,10 +572,20 @@ async def test_right(
{ATTR_UNIT_OF_MEASUREMENT: UnitOfPower.KILO_WATT} | extra_attributes,
force_update=force_update,
)
await hass.async_block_till_done()
state = hass.states.get("sensor.integration")
assert round(float(state.state), config["sensor"]["round"]) == expected
await hass.async_block_till_done()
await hass.async_block_till_done()
states = (
[events[0].data["new_state"].state]
+ [events[1].data["new_state"].state]
+ [
round(float(event.data["new_state"].state), config["sensor"]["round"])
for event in events[2:]
]
)
assert states == ["unknown", "unknown", *expected_states]
state = events[-1].data["new_state"]
assert state.attributes.get("unit_of_measurement") == UnitOfEnergy.KILO_WATT_HOUR