Fix history start time state returning None with some postgresql versions (#93555)

* Add additional coverage to history websocket api

related issue #93258

* Add additional coverage to history websocket api

related issue #93258

* Fix results when union query ends up at the end instead of front

* Apply suggestions from code review

* resort

* zero instead

* fix exception

* fix tests
This commit is contained in:
J. Nick Koston 2023-05-25 21:18:19 -05:00 committed by GitHub
parent 61e57872c3
commit f251c464e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 82 deletions

View File

@ -9,7 +9,6 @@ from typing import Any, cast
from sqlalchemy import (
CompoundSelect,
Integer,
Select,
Subquery,
and_,
@ -19,7 +18,6 @@ from sqlalchemy import (
select,
union_all,
)
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session
@ -52,16 +50,6 @@ _FIELD_MAP = {
}
CASTABLE_DOUBLE_TYPE = (
# MySQL/MariaDB < 10.4+ does not support casting to DOUBLE so we have to use Integer instead but it doesn't
# matter because we don't use the value as its always set to NULL
#
# sqlalchemy.exc.SAWarning: Datatype DOUBLE does not support CAST on MySQL/MariaDb; the CAST will be skipped.
#
Integer().with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
def _stmt_and_join_attributes(
no_attributes: bool, include_last_changed: bool
) -> Select:
@ -79,13 +67,9 @@ def _stmt_and_join_attributes_for_start_state(
) -> Select:
"""Return the statement and if StateAttributes should be joined."""
_select = select(States.metadata_id, States.state)
_select = _select.add_columns(
literal(value=None).label("last_updated_ts").cast(CASTABLE_DOUBLE_TYPE)
)
_select = _select.add_columns(literal(value=0).label("last_updated_ts"))
if include_last_changed:
_select = _select.add_columns(
literal(value=None).label("last_changed_ts").cast(CASTABLE_DOUBLE_TYPE)
)
_select = _select.add_columns(literal(value=0).label("last_changed_ts"))
if not no_attributes:
_select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES)
return _select
@ -174,28 +158,29 @@ def _significant_states_stmt(
stmt = stmt.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
stmt = stmt.order_by(States.metadata_id, States.last_updated_ts)
if not include_start_time_state or not run_start_ts:
stmt = stmt.order_by(States.metadata_id, States.last_updated_ts)
return stmt
return _select_from_subquery(
union_all(
_select_from_subquery(
_get_start_time_state_stmt(
run_start_ts,
start_time_ts,
single_metadata_id,
metadata_ids,
no_attributes,
include_last_changed,
).subquery(),
unioned_subquery = union_all(
_select_from_subquery(
_get_start_time_state_stmt(
run_start_ts,
start_time_ts,
single_metadata_id,
metadata_ids,
no_attributes,
include_last_changed,
),
_select_from_subquery(stmt.subquery(), no_attributes, include_last_changed),
).subquery(),
).subquery(),
no_attributes,
include_last_changed,
),
_select_from_subquery(stmt.subquery(), no_attributes, include_last_changed),
).subquery()
return _select_from_subquery(
unioned_subquery,
no_attributes,
include_last_changed,
)
).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts)
def get_significant_states_with_session(
@ -279,6 +264,7 @@ def get_significant_states_with_session(
entity_id_to_metadata_id,
minimal_response,
compressed_state_format,
no_attributes=no_attributes,
)
@ -433,6 +419,7 @@ def state_changes_during_period(
entity_ids,
entity_id_to_metadata_id,
descending=descending,
no_attributes=no_attributes,
),
)
@ -528,6 +515,7 @@ def get_last_state_changes(
None,
entity_ids,
entity_id_to_metadata_id,
no_attributes=False,
),
)
@ -651,6 +639,7 @@ def _sorted_states_to_dict(
minimal_response: bool = False,
compressed_state_format: bool = False,
descending: bool = False,
no_attributes: bool = False,
) -> MutableMapping[str, list[State | dict[str, Any]]]:
"""Convert SQL results into JSON friendly data structure.
@ -665,7 +654,7 @@ def _sorted_states_to_dict(
"""
field_map = _FIELD_MAP
state_class: Callable[
[Row, dict[str, dict[str, Any]], float | None, str, str, float | None],
[Row, dict[str, dict[str, Any]], float | None, str, str, float | None, bool],
State | dict[str, Any],
]
if compressed_state_format:
@ -716,6 +705,7 @@ def _sorted_states_to_dict(
entity_id,
db_state[state_idx],
db_state[last_updated_ts_idx],
False,
)
for db_state in group
)
@ -738,6 +728,7 @@ def _sorted_states_to_dict(
entity_id,
prev_state, # type: ignore[arg-type]
first_state[last_updated_ts_idx],
no_attributes,
)
)

View File

@ -53,6 +53,7 @@ class LazyState(State):
entity_id: str,
state: str,
last_updated_ts: float | None,
no_attributes: bool,
) -> None:
"""Init the lazy state."""
self._row = row
@ -143,14 +144,14 @@ def row_to_compressed_state(
entity_id: str,
state: str,
last_updated_ts: float | None,
no_attributes: bool,
) -> dict[str, Any]:
"""Convert a database row to a compressed state schema 41 and later."""
comp_state: dict[str, Any] = {
COMPRESSED_STATE_STATE: state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_source(
comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state}
if not no_attributes:
comp_state[COMPRESSED_STATE_ATTRIBUTES] = decode_attributes_from_source(
getattr(row, "attributes", None), attr_cache
),
}
)
row_last_updated_ts: float = last_updated_ts or start_time_ts # type: ignore[assignment]
comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
if (

View File

@ -1,5 +1,6 @@
"""The tests the History component websocket_api."""
# pylint: disable=protected-access,invalid-name
import asyncio
from datetime import timedelta
from unittest.mock import patch
@ -97,7 +98,7 @@ async def test_history_during_period(
assert len(sensor_test_history) == 3
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {}
assert "a" not in sensor_test_history[0] # no_attributes = True
assert isinstance(sensor_test_history[0]["lu"], float)
assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu)
@ -510,17 +511,13 @@ async def test_history_stream_historical_only(
"start_time": now.timestamp(),
"states": {
"sensor.four": [
{"a": {}, "lu": sensor_four_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
{"lu": sensor_four_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}],
"sensor.three": [
{"a": {}, "lu": sensor_three_last_updated.timestamp(), "s": "off"}
],
"sensor.two": [
{"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"}
{"lu": sensor_three_last_updated.timestamp(), "s": "off"}
],
"sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}],
},
},
"id": 1,
@ -857,12 +854,8 @@ async def test_history_stream_live_no_attributes_minimal_response(
"end_time": first_end_time,
"start_time": now.timestamp(),
"states": {
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
],
"sensor.two": [
{"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}],
"sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}],
},
},
"id": 1,
@ -1220,12 +1213,8 @@ async def test_history_stream_live_no_attributes_minimal_response_specific_entit
"end_time": first_end_time,
"start_time": now.timestamp(),
"states": {
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
],
"sensor.two": [
{"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}],
"sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}],
},
},
"id": 1,
@ -1306,12 +1295,8 @@ async def test_history_stream_live_with_future_end_time(
"end_time": first_end_time,
"start_time": now.timestamp(),
"states": {
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
],
"sensor.two": [
{"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}],
"sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}],
},
},
"id": 1,
@ -1505,10 +1490,10 @@ async def test_overflow_queue(
"start_time": now.timestamp(),
"states": {
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
{"lu": sensor_one_last_updated.timestamp(), "s": "on"}
],
"sensor.two": [
{"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"}
{"lu": sensor_two_last_updated.timestamp(), "s": "off"}
],
},
},
@ -1722,9 +1707,7 @@ async def test_history_stream_for_invalid_entity_ids(
"end_time": sensor_one_last_updated.timestamp(),
"start_time": now.timestamp(),
"states": {
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
],
"sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}],
},
},
"id": 1,
@ -1754,12 +1737,8 @@ async def test_history_stream_for_invalid_entity_ids(
"end_time": sensor_two_last_updated.timestamp(),
"start_time": now.timestamp(),
"states": {
"sensor.one": [
{"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"}
],
"sensor.two": [
{"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}],
"sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}],
},
},
"id": 2,
@ -1843,3 +1822,91 @@ async def test_history_stream_for_invalid_entity_ids(
"type": "result",
"success": False,
}
async def test_history_stream_historical_only_with_start_time_state_past(
recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test history stream."""
await async_setup_component(
hass,
"history",
{},
)
await async_setup_component(hass, "sensor", {})
hass.states.async_set("sensor.one", "first", attributes={"any": "attr"})
hass.states.get("sensor.one").last_updated
await async_recorder_block_till_done(hass)
await asyncio.sleep(0.00002)
now = dt_util.utcnow()
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.one", "second", attributes={"any": "attr"})
sensor_one_last_updated_second = hass.states.get("sensor.one").last_updated
await asyncio.sleep(0.00001)
hass.states.async_set("sensor.one", "third", attributes={"any": "attr"})
sensor_one_last_updated_third = hass.states.get("sensor.one").last_updated
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.two", "off", attributes={"any": "attr"})
sensor_two_last_updated = hass.states.get("sensor.two").last_updated
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.three", "off", attributes={"any": "changed"})
sensor_three_last_updated = hass.states.get("sensor.three").last_updated
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.four", "off", attributes={"any": "again"})
sensor_four_last_updated = hass.states.get("sensor.four").last_updated
await async_recorder_block_till_done(hass)
hass.states.async_set("switch.excluded", "off", attributes={"any": "again"})
await async_wait_recording_done(hass)
end_time = dt_util.utcnow()
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/stream",
"entity_ids": ["sensor.one", "sensor.two", "sensor.three", "sensor.four"],
"start_time": now.isoformat(),
"end_time": end_time.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"
response = await client.receive_json()
assert response == {
"event": {
"end_time": sensor_four_last_updated.timestamp(),
"start_time": now.timestamp(),
"states": {
"sensor.four": [
{"lu": sensor_four_last_updated.timestamp(), "s": "off"}
],
"sensor.one": [
{
"lu": now.timestamp(),
"s": "first",
}, # should use start time state
{"lu": sensor_one_last_updated_second.timestamp(), "s": "second"},
{"lu": sensor_one_last_updated_third.timestamp(), "s": "third"},
],
"sensor.three": [
{"lu": sensor_three_last_updated.timestamp(), "s": "off"}
],
"sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}],
},
},
"id": 1,
"type": "event",
}

View File

@ -271,7 +271,7 @@ async def test_lazy_state_handles_include_json(
entity_id="sensor.invalid",
shared_attrs="{INVALID_JSON}",
)
assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {}
assert LazyState(row, {}, None, row.entity_id, "", 1, False).attributes == {}
assert "Error converting row to state attributes" in caplog.text
@ -283,7 +283,9 @@ async def test_lazy_state_can_decode_attributes(
entity_id="sensor.invalid",
attributes='{"shared":true}',
)
assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {"shared": True}
assert LazyState(row, {}, None, row.entity_id, "", 1, False).attributes == {
"shared": True
}
async def test_lazy_state_handles_different_last_updated_and_last_changed(
@ -298,7 +300,9 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed(
last_updated_ts=now.timestamp(),
last_changed_ts=(now - timedelta(seconds=60)).timestamp(),
)
lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts)
lstate = LazyState(
row, {}, None, row.entity_id, row.state, row.last_updated_ts, False
)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",
@ -329,7 +333,9 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(
last_updated_ts=now.timestamp(),
last_changed_ts=now.timestamp(),
)
lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts)
lstate = LazyState(
row, {}, None, row.entity_id, row.state, row.last_updated_ts, False
)
assert lstate.as_dict() == {
"attributes": {"shared": True},
"entity_id": "sensor.valid",