diff --git a/homeassistant/components/logbook/websocket_api.py b/homeassistant/components/logbook/websocket_api.py index edfe3177f26..0bb7877b95b 100644 --- a/homeassistant/components/logbook/websocket_api.py +++ b/homeassistant/components/logbook/websocket_api.py @@ -75,6 +75,7 @@ async def _async_send_historical_events( end_time: dt, formatter: Callable[[int, Any], dict[str, Any]], event_processor: EventProcessor, + partial: bool, ) -> dt | None: """Select historical data from the database and deliver it to the websocket. @@ -94,88 +95,107 @@ async def _async_send_historical_events( ) if not is_big_query: - message, last_event_time = await _async_get_ws_formatted_events( + message, last_event_time = await _async_get_ws_stream_events( hass, msg_id, start_time, end_time, formatter, event_processor, + partial, ) - # If there is no last_time, there are no historical - # results, but we still send an empty message so + # If there is no last_event_time, there are no historical + # results, but we still send an empty message + # if its the last one (not partial) so # consumers of the api know their request was # answered but there were no results - connection.send_message(message) + if last_event_time or not partial: + connection.send_message(message) return last_event_time # This is a big query so we deliver # the first three hours and then # we fetch the old data recent_query_start = end_time - timedelta(hours=BIG_QUERY_RECENT_HOURS) - recent_message, recent_query_last_event_time = await _async_get_ws_formatted_events( + recent_message, recent_query_last_event_time = await _async_get_ws_stream_events( hass, msg_id, recent_query_start, end_time, formatter, event_processor, + partial=True, ) if recent_query_last_event_time: connection.send_message(recent_message) - older_message, older_query_last_event_time = await _async_get_ws_formatted_events( + older_message, older_query_last_event_time = await _async_get_ws_stream_events( hass, msg_id, start_time, recent_query_start, formatter, event_processor, + partial, ) - # If there is no last_time, there are no historical - # results, but we still send an empty message so + # If there is no last_event_time, there are no historical + # results, but we still send an empty message + # if its the last one (not partial) so # consumers of the api know their request was # answered but there were no results - if older_query_last_event_time or not recent_query_last_event_time: + if older_query_last_event_time or not partial: connection.send_message(older_message) # Returns the time of the newest event return recent_query_last_event_time or older_query_last_event_time -async def _async_get_ws_formatted_events( +async def _async_get_ws_stream_events( hass: HomeAssistant, msg_id: int, start_time: dt, end_time: dt, formatter: Callable[[int, Any], dict[str, Any]], event_processor: EventProcessor, + partial: bool, ) -> tuple[str, dt | None]: """Async wrapper around _ws_formatted_get_events.""" return await get_instance(hass).async_add_executor_job( - _ws_formatted_get_events, + _ws_stream_get_events, msg_id, start_time, end_time, formatter, event_processor, + partial, ) -def _ws_formatted_get_events( +def _ws_stream_get_events( msg_id: int, start_day: dt, end_day: dt, formatter: Callable[[int, Any], dict[str, Any]], event_processor: EventProcessor, + partial: bool, ) -> tuple[str, dt | None]: """Fetch events and convert them to json in the executor.""" events = event_processor.get_events(start_day, end_day) last_time = None if events: last_time = dt_util.utc_from_timestamp(events[-1]["when"]) - result = formatter(msg_id, events) - return JSON_DUMP(result), last_time + message = { + "events": events, + "start_time": dt_util.utc_to_timestamp(start_day), + "end_time": dt_util.utc_to_timestamp(end_day), + } + if partial: + # This is a hint to consumers of the api that + # we are about to send a another block of historical + # data in case the UI needs to show that historical + # data is still loading in the future + message["partial"] = True + return JSON_DUMP(formatter(msg_id, message)), last_time async def _async_events_consumer( @@ -209,7 +229,7 @@ async def _async_events_consumer( JSON_DUMP( messages.event_message( msg_id, - logbook_events, + {"events": logbook_events}, ) ) ) @@ -279,6 +299,7 @@ async def ws_event_stream( end_time, messages.event_message, event_processor, + partial=False, ) return @@ -331,6 +352,7 @@ async def ws_event_stream( subscriptions_setup_complete_time, messages.event_message, event_processor, + partial=True, ) await _async_wait_for_recorder_sync(hass) @@ -353,16 +375,16 @@ async def ws_event_stream( second_fetch_start_time = max( last_event_time or max_recorder_behind, max_recorder_behind ) - message, final_cutoff_time = await _async_get_ws_formatted_events( + await _async_send_historical_events( hass, + connection, msg_id, second_fetch_start_time, subscriptions_setup_complete_time, messages.event_message, event_processor, + partial=False, ) - if final_cutoff_time: # Only sends results if we have them - connection.send_message(message) if not subscriptions: # Unsubscribe happened while waiting for formatted events @@ -379,6 +401,20 @@ async def ws_event_stream( ) +def _ws_formatted_get_events( + msg_id: int, + start_time: dt, + end_time: dt, + event_processor: EventProcessor, +) -> str: + """Fetch events and convert them to json in the executor.""" + return JSON_DUMP( + messages.result_message( + msg_id, event_processor.get_events(start_time, end_time) + ) + ) + + @websocket_api.websocket_command( { vol.Required("type"): "logbook/get_events", @@ -438,12 +474,12 @@ async def ws_get_events( include_entity_name=False, ) - message, _ = await _async_get_ws_formatted_events( - hass, - msg["id"], - start_time, - end_time, - messages.result_message, - event_processor, + connection.send_message( + await hass.async_add_executor_job( + _ws_formatted_get_events, + msg["id"], + start_time, + end_time, + event_processor, + ) ) - connection.send_message(message) diff --git a/tests/components/logbook/test_websocket_api.py b/tests/components/logbook/test_websocket_api.py index 26ad7127296..8706ccf7617 100644 --- a/tests/components/logbook/test_websocket_api.py +++ b/tests/components/logbook/test_websocket_api.py @@ -492,13 +492,16 @@ async def test_subscribe_unsubscribe_logbook_stream( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.is_light", "state": "off", "when": state.last_updated.timestamp(), } ] + assert msg["event"]["start_time"] == now.timestamp() + assert msg["event"]["end_time"] > msg["event"]["start_time"] + assert msg["event"]["partial"] is True hass.states.async_set("light.alpha", "on") hass.states.async_set("light.alpha", "off") @@ -520,7 +523,14 @@ async def test_subscribe_unsubscribe_logbook_stream( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert "partial" not in msg["event"]["events"] + assert msg["event"]["events"] == [] + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"]["events"] + assert msg["event"]["events"] == [ { "entity_id": "light.alpha", "state": "off", @@ -560,7 +570,7 @@ async def test_subscribe_unsubscribe_logbook_stream( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "context_id": ANY, "domain": "automation", @@ -624,7 +634,7 @@ async def test_subscribe_unsubscribe_logbook_stream( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "context_id": "ac5bd62de45711eaaeb351041eec8dd9", "context_user_id": "b400facee45711eaa9308bfd3d19e474", @@ -686,7 +696,7 @@ async def test_subscribe_unsubscribe_logbook_stream( msg = await websocket_client.receive_json() assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "context_domain": "automation", "context_entity_id": "automation.alarm", @@ -716,7 +726,7 @@ async def test_subscribe_unsubscribe_logbook_stream( msg = await websocket_client.receive_json() assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "context_domain": "automation", "context_entity_id": "automation.alarm", @@ -788,7 +798,10 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert "start_time" in msg["event"] + assert "end_time" in msg["event"] + assert msg["event"]["partial"] is True + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.is_light", "state": "off", @@ -805,7 +818,16 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert "start_time" in msg["event"] + assert "end_time" in msg["event"] + assert "partial" not in msg["event"] + assert msg["event"]["events"] == [] + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"] + assert msg["event"]["events"] == [ { "entity_id": "light.small", "state": "off", @@ -871,7 +893,8 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["partial"] is True + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.is_light", "state": "off", @@ -888,7 +911,14 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert "partial" not in msg["event"] + assert msg["event"]["events"] == [] + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"] + assert msg["event"]["events"] == [ { "entity_id": "light.small", "state": "off", @@ -962,7 +992,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_past_only( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.is_light", "state": "off", @@ -1048,7 +1078,7 @@ async def test_subscribe_unsubscribe_logbook_stream_big_query( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.is_light", "state": "on", @@ -1060,7 +1090,8 @@ async def test_subscribe_unsubscribe_logbook_stream_big_query( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["partial"] is True + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.four_days_ago", "state": "off", @@ -1068,6 +1099,13 @@ async def test_subscribe_unsubscribe_logbook_stream_big_query( } ] + # And finally a response without partial set to indicate no more + # historical data is coming + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"]["events"] == [] + await websocket_client.send_json( {"id": 8, "type": "unsubscribe_events", "subscription": 7} ) @@ -1123,7 +1161,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [] + assert msg["event"]["events"] == [] hass.bus.async_fire("mock_event", {"device_id": device.id}) await hass.async_block_till_done() @@ -1131,7 +1169,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ {"domain": "test", "message": "is on fire", "name": "device name", "when": ANY} ] @@ -1250,7 +1288,7 @@ async def test_live_stream_with_one_second_commit_interval( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - recieved_rows.extend(msg["event"]) + recieved_rows.extend(msg["event"]["events"]) hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "6"}) @@ -1262,7 +1300,7 @@ async def test_live_stream_with_one_second_commit_interval( msg = await asyncio.wait_for(websocket_client.receive_json(), 2.5) assert msg["id"] == 7 assert msg["type"] == "event" - recieved_rows.extend(msg["event"]) + recieved_rows.extend(msg["event"]["events"]) # Make sure we get rows back in order assert recieved_rows == [ @@ -1326,7 +1364,7 @@ async def test_subscribe_disconnected(hass, recorder_mock, hass_ws_client): msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ { "entity_id": "binary_sensor.is_light", "state": "off", @@ -1437,7 +1475,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [] + assert msg["event"]["events"] == [] hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"}) await hass.async_block_till_done() @@ -1445,7 +1483,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ {"domain": "test", "message": "1", "name": "device name", "when": ANY} ] @@ -1455,7 +1493,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"] == [ + assert msg["event"]["events"] == [ {"domain": "test", "message": "2", "name": "device name", "when": ANY} ]