From f251c464e2f1b911a80af21eb3119224979ee42b Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 25 May 2023 21:18:19 -0500 Subject: [PATCH] Fix history start time state returning None with some postgresql versions (#93555) * Add additional coverage to history websocket api related issue #93258 * Add additional coverage to history websocket api related issue #93258 * Fix results when union query ends up at the end instead of front * Apply suggestions from code review * resort * zero instead * fix exception * fix tests --- .../components/recorder/history/modern.py | 61 ++++---- .../components/recorder/models/state.py | 11 +- .../components/history/test_websocket_api.py | 143 +++++++++++++----- tests/components/recorder/test_models.py | 14 +- 4 files changed, 147 insertions(+), 82 deletions(-) diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py index 5322074c205..393bcfa3676 100644 --- a/homeassistant/components/recorder/history/modern.py +++ b/homeassistant/components/recorder/history/modern.py @@ -9,7 +9,6 @@ from typing import Any, cast from sqlalchemy import ( CompoundSelect, - Integer, Select, Subquery, and_, @@ -19,7 +18,6 @@ from sqlalchemy import ( select, union_all, ) -from sqlalchemy.dialects import postgresql from sqlalchemy.engine.row import Row from sqlalchemy.orm.session import Session @@ -52,16 +50,6 @@ _FIELD_MAP = { } -CASTABLE_DOUBLE_TYPE = ( - # MySQL/MariaDB < 10.4+ does not support casting to DOUBLE so we have to use Integer instead but it doesn't - # matter because we don't use the value as its always set to NULL - # - # sqlalchemy.exc.SAWarning: Datatype DOUBLE does not support CAST on MySQL/MariaDb; the CAST will be skipped. - # - Integer().with_variant(postgresql.DOUBLE_PRECISION(), "postgresql") -) - - def _stmt_and_join_attributes( no_attributes: bool, include_last_changed: bool ) -> Select: @@ -79,13 +67,9 @@ def _stmt_and_join_attributes_for_start_state( ) -> Select: """Return the statement and if StateAttributes should be joined.""" _select = select(States.metadata_id, States.state) - _select = _select.add_columns( - literal(value=None).label("last_updated_ts").cast(CASTABLE_DOUBLE_TYPE) - ) + _select = _select.add_columns(literal(value=0).label("last_updated_ts")) if include_last_changed: - _select = _select.add_columns( - literal(value=None).label("last_changed_ts").cast(CASTABLE_DOUBLE_TYPE) - ) + _select = _select.add_columns(literal(value=0).label("last_changed_ts")) if not no_attributes: _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) return _select @@ -174,28 +158,29 @@ def _significant_states_stmt( stmt = stmt.outerjoin( StateAttributes, States.attributes_id == StateAttributes.attributes_id ) - stmt = stmt.order_by(States.metadata_id, States.last_updated_ts) if not include_start_time_state or not run_start_ts: + stmt = stmt.order_by(States.metadata_id, States.last_updated_ts) return stmt - return _select_from_subquery( - union_all( - _select_from_subquery( - _get_start_time_state_stmt( - run_start_ts, - start_time_ts, - single_metadata_id, - metadata_ids, - no_attributes, - include_last_changed, - ).subquery(), + unioned_subquery = union_all( + _select_from_subquery( + _get_start_time_state_stmt( + run_start_ts, + start_time_ts, + single_metadata_id, + metadata_ids, no_attributes, include_last_changed, - ), - _select_from_subquery(stmt.subquery(), no_attributes, include_last_changed), - ).subquery(), + ).subquery(), + no_attributes, + include_last_changed, + ), + _select_from_subquery(stmt.subquery(), no_attributes, include_last_changed), + ).subquery() + return _select_from_subquery( + unioned_subquery, no_attributes, include_last_changed, - ) + ).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts) def get_significant_states_with_session( @@ -279,6 +264,7 @@ def get_significant_states_with_session( entity_id_to_metadata_id, minimal_response, compressed_state_format, + no_attributes=no_attributes, ) @@ -433,6 +419,7 @@ def state_changes_during_period( entity_ids, entity_id_to_metadata_id, descending=descending, + no_attributes=no_attributes, ), ) @@ -528,6 +515,7 @@ def get_last_state_changes( None, entity_ids, entity_id_to_metadata_id, + no_attributes=False, ), ) @@ -651,6 +639,7 @@ def _sorted_states_to_dict( minimal_response: bool = False, compressed_state_format: bool = False, descending: bool = False, + no_attributes: bool = False, ) -> MutableMapping[str, list[State | dict[str, Any]]]: """Convert SQL results into JSON friendly data structure. @@ -665,7 +654,7 @@ def _sorted_states_to_dict( """ field_map = _FIELD_MAP state_class: Callable[ - [Row, dict[str, dict[str, Any]], float | None, str, str, float | None], + [Row, dict[str, dict[str, Any]], float | None, str, str, float | None, bool], State | dict[str, Any], ] if compressed_state_format: @@ -716,6 +705,7 @@ def _sorted_states_to_dict( entity_id, db_state[state_idx], db_state[last_updated_ts_idx], + False, ) for db_state in group ) @@ -738,6 +728,7 @@ def _sorted_states_to_dict( entity_id, prev_state, # type: ignore[arg-type] first_state[last_updated_ts_idx], + no_attributes, ) ) diff --git a/homeassistant/components/recorder/models/state.py b/homeassistant/components/recorder/models/state.py index 523ffdf1852..73e7798b9f5 100644 --- a/homeassistant/components/recorder/models/state.py +++ b/homeassistant/components/recorder/models/state.py @@ -53,6 +53,7 @@ class LazyState(State): entity_id: str, state: str, last_updated_ts: float | None, + no_attributes: bool, ) -> None: """Init the lazy state.""" self._row = row @@ -143,14 +144,14 @@ def row_to_compressed_state( entity_id: str, state: str, last_updated_ts: float | None, + no_attributes: bool, ) -> dict[str, Any]: """Convert a database row to a compressed state schema 41 and later.""" - comp_state: dict[str, Any] = { - COMPRESSED_STATE_STATE: state, - COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_source( + comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state} + if not no_attributes: + comp_state[COMPRESSED_STATE_ATTRIBUTES] = decode_attributes_from_source( getattr(row, "attributes", None), attr_cache - ), - } + ) row_last_updated_ts: float = last_updated_ts or start_time_ts # type: ignore[assignment] comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts if ( diff --git a/tests/components/history/test_websocket_api.py b/tests/components/history/test_websocket_api.py index f8d4ec7d9f7..4f00e50def1 100644 --- a/tests/components/history/test_websocket_api.py +++ b/tests/components/history/test_websocket_api.py @@ -1,5 +1,6 @@ """The tests the History component websocket_api.""" # pylint: disable=protected-access,invalid-name +import asyncio from datetime import timedelta from unittest.mock import patch @@ -97,7 +98,7 @@ async def test_history_during_period( assert len(sensor_test_history) == 3 assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {} + assert "a" not in sensor_test_history[0] # no_attributes = True assert isinstance(sensor_test_history[0]["lu"], float) assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) @@ -510,17 +511,13 @@ async def test_history_stream_historical_only( "start_time": now.timestamp(), "states": { "sensor.four": [ - {"a": {}, "lu": sensor_four_last_updated.timestamp(), "s": "off"} - ], - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + {"lu": sensor_four_last_updated.timestamp(), "s": "off"} ], + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}], "sensor.three": [ - {"a": {}, "lu": sensor_three_last_updated.timestamp(), "s": "off"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + {"lu": sensor_three_last_updated.timestamp(), "s": "off"} ], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}], }, }, "id": 1, @@ -857,12 +854,8 @@ async def test_history_stream_live_no_attributes_minimal_response( "end_time": first_end_time, "start_time": now.timestamp(), "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}], }, }, "id": 1, @@ -1220,12 +1213,8 @@ async def test_history_stream_live_no_attributes_minimal_response_specific_entit "end_time": first_end_time, "start_time": now.timestamp(), "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}], }, }, "id": 1, @@ -1306,12 +1295,8 @@ async def test_history_stream_live_with_future_end_time( "end_time": first_end_time, "start_time": now.timestamp(), "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}], }, }, "id": 1, @@ -1505,10 +1490,10 @@ async def test_overflow_queue( "start_time": now.timestamp(), "states": { "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + {"lu": sensor_one_last_updated.timestamp(), "s": "on"} ], "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + {"lu": sensor_two_last_updated.timestamp(), "s": "off"} ], }, }, @@ -1722,9 +1707,7 @@ async def test_history_stream_for_invalid_entity_ids( "end_time": sensor_one_last_updated.timestamp(), "start_time": now.timestamp(), "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}], }, }, "id": 1, @@ -1754,12 +1737,8 @@ async def test_history_stream_for_invalid_entity_ids( "end_time": sensor_two_last_updated.timestamp(), "start_time": now.timestamp(), "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "on"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}], }, }, "id": 2, @@ -1843,3 +1822,91 @@ async def test_history_stream_for_invalid_entity_ids( "type": "result", "success": False, } + + +async def test_history_stream_historical_only_with_start_time_state_past( + recorder_mock: Recorder, hass: HomeAssistant, hass_ws_client: WebSocketGenerator +) -> None: + """Test history stream.""" + await async_setup_component( + hass, + "history", + {}, + ) + await async_setup_component(hass, "sensor", {}) + + hass.states.async_set("sensor.one", "first", attributes={"any": "attr"}) + hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + + await asyncio.sleep(0.00002) + now = dt_util.utcnow() + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "second", attributes={"any": "attr"}) + sensor_one_last_updated_second = hass.states.get("sensor.one").last_updated + + await asyncio.sleep(0.00001) + hass.states.async_set("sensor.one", "third", attributes={"any": "attr"}) + sensor_one_last_updated_third = hass.states.get("sensor.one").last_updated + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.three", "off", attributes={"any": "changed"}) + sensor_three_last_updated = hass.states.get("sensor.three").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.four", "off", attributes={"any": "again"}) + sensor_four_last_updated = hass.states.get("sensor.four").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + end_time = dt_util.utcnow() + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": ["sensor.one", "sensor.two", "sensor.three", "sensor.four"], + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + + assert response == { + "event": { + "end_time": sensor_four_last_updated.timestamp(), + "start_time": now.timestamp(), + "states": { + "sensor.four": [ + {"lu": sensor_four_last_updated.timestamp(), "s": "off"} + ], + "sensor.one": [ + { + "lu": now.timestamp(), + "s": "first", + }, # should use start time state + {"lu": sensor_one_last_updated_second.timestamp(), "s": "second"}, + {"lu": sensor_one_last_updated_third.timestamp(), "s": "third"}, + ], + "sensor.three": [ + {"lu": sensor_three_last_updated.timestamp(), "s": "off"} + ], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "off"}], + }, + }, + "id": 1, + "type": "event", + } diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index 8797c153a7b..d1058c38b70 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -271,7 +271,7 @@ async def test_lazy_state_handles_include_json( entity_id="sensor.invalid", shared_attrs="{INVALID_JSON}", ) - assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {} + assert LazyState(row, {}, None, row.entity_id, "", 1, False).attributes == {} assert "Error converting row to state attributes" in caplog.text @@ -283,7 +283,9 @@ async def test_lazy_state_can_decode_attributes( entity_id="sensor.invalid", attributes='{"shared":true}', ) - assert LazyState(row, {}, None, row.entity_id, "", 1).attributes == {"shared": True} + assert LazyState(row, {}, None, row.entity_id, "", 1, False).attributes == { + "shared": True + } async def test_lazy_state_handles_different_last_updated_and_last_changed( @@ -298,7 +300,9 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed( last_updated_ts=now.timestamp(), last_changed_ts=(now - timedelta(seconds=60)).timestamp(), ) - lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts) + lstate = LazyState( + row, {}, None, row.entity_id, row.state, row.last_updated_ts, False + ) assert lstate.as_dict() == { "attributes": {"shared": True}, "entity_id": "sensor.valid", @@ -329,7 +333,9 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed( last_updated_ts=now.timestamp(), last_changed_ts=now.timestamp(), ) - lstate = LazyState(row, {}, None, row.entity_id, row.state, row.last_updated_ts) + lstate = LazyState( + row, {}, None, row.entity_id, row.state, row.last_updated_ts, False + ) assert lstate.as_dict() == { "attributes": {"shared": True}, "entity_id": "sensor.valid",