From fcb6888f87d7688e978c061bb13012ef787f2f33 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 9 Sep 2022 17:16:02 -0500 Subject: [PATCH] Start logbook stream faster (#77921) Co-authored-by: Paulus Schoutsen --- .../components/logbook/websocket_api.py | 82 +++++------- .../components/logbook/test_websocket_api.py | 117 +++++++++++------- 2 files changed, 106 insertions(+), 93 deletions(-) diff --git a/homeassistant/components/logbook/websocket_api.py b/homeassistant/components/logbook/websocket_api.py index 3af87b26caa..d28666963c8 100644 --- a/homeassistant/components/logbook/websocket_api.py +++ b/homeassistant/components/logbook/websocket_api.py @@ -31,7 +31,6 @@ from .processor import EventProcessor MAX_PENDING_LOGBOOK_EVENTS = 2048 EVENT_COALESCE_TIME = 0.35 -MAX_RECORDER_WAIT = 10 # minimum size that we will split the query BIG_QUERY_HOURS = 25 # how many hours to deliver in the first chunk when we split the query @@ -48,6 +47,7 @@ class LogbookLiveStream: subscriptions: list[CALLBACK_TYPE] end_time_unsub: CALLBACK_TYPE | None = None task: asyncio.Task | None = None + wait_sync_task: asyncio.Task | None = None @callback @@ -57,18 +57,6 @@ def async_setup(hass: HomeAssistant) -> None: websocket_api.async_register_command(hass, ws_event_stream) -async def _async_wait_for_recorder_sync(hass: HomeAssistant) -> None: - """Wait for the recorder to sync.""" - try: - await asyncio.wait_for( - get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT - ) - except asyncio.TimeoutError: - _LOGGER.debug( - "Recorder is behind more than %s seconds, starting live stream; Some results may be missing" - ) - - @callback def _async_send_empty_response( connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None @@ -347,8 +335,11 @@ async def ws_event_stream( subscriptions.clear() if live_stream.task: live_stream.task.cancel() + if live_stream.wait_sync_task: + live_stream.wait_sync_task.cancel() if live_stream.end_time_unsub: live_stream.end_time_unsub() + live_stream.end_time_unsub = None if end_time: live_stream.end_time_unsub = async_track_point_in_utc_time( @@ -395,43 +386,6 @@ async def ws_event_stream( partial=True, ) - await _async_wait_for_recorder_sync(hass) - if msg_id not in connection.subscriptions: - # Unsubscribe happened while waiting for recorder - return - - # - # Fetch any events from the database that have - # not been committed since the original fetch - # so we can switch over to using the subscriptions - # - # We only want events that happened after the last event - # we had from the last database query or the maximum - # time we allow the recorder to be behind - # - max_recorder_behind = subscriptions_setup_complete_time - timedelta( - seconds=MAX_RECORDER_WAIT - ) - second_fetch_start_time = max( - last_event_time or max_recorder_behind, max_recorder_behind - ) - await _async_send_historical_events( - hass, - connection, - msg_id, - second_fetch_start_time, - subscriptions_setup_complete_time, - messages.event_message, - event_processor, - partial=False, - ) - - if not subscriptions: - # Unsubscribe happened while waiting for formatted events - # or there are no supported entities (all UOM or state class) - # or devices - return - live_stream.task = asyncio.create_task( _async_events_consumer( subscriptions_setup_complete_time, @@ -442,6 +396,34 @@ async def ws_event_stream( ) ) + if msg_id not in connection.subscriptions: + # Unsubscribe happened while sending historical events + return + + live_stream.wait_sync_task = asyncio.create_task( + get_instance(hass).async_block_till_done() + ) + await live_stream.wait_sync_task + + # + # Fetch any events from the database that have + # not been committed since the original fetch + # so we can switch over to using the subscriptions + # + # We only want events that happened after the last event + # we had from the last database query + # + await _async_send_historical_events( + hass, + connection, + msg_id, + last_event_time or start_time, + subscriptions_setup_complete_time, + messages.event_message, + event_processor, + partial=False, + ) + def _ws_formatted_get_events( msg_id: int, diff --git a/tests/components/logbook/test_websocket_api.py b/tests/components/logbook/test_websocket_api.py index ec4f2183a9a..4b2c40f41c0 100644 --- a/tests/components/logbook/test_websocket_api.py +++ b/tests/components/logbook/test_websocket_api.py @@ -11,6 +11,7 @@ from homeassistant import core from homeassistant.components import logbook, recorder from homeassistant.components.automation import ATTR_SOURCE, EVENT_AUTOMATION_TRIGGERED from homeassistant.components.logbook import websocket_api +from homeassistant.components.recorder.util import get_instance from homeassistant.components.script import EVENT_SCRIPT_STARTED from homeassistant.components.websocket_api.const import TYPE_RESULT from homeassistant.const import ( @@ -566,6 +567,15 @@ async def test_subscribe_unsubscribe_logbook_stream_excluded_entities( assert msg["event"]["end_time"] > msg["event"]["start_time"] assert msg["event"]["partial"] is True + await get_instance(hass).async_block_till_done() + await hass.async_block_till_done() + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"]["events"] + assert msg["event"]["events"] == [] + hass.states.async_set("light.exc", STATE_ON) hass.states.async_set("light.exc", STATE_OFF) hass.states.async_set("switch.any", STATE_ON) @@ -588,12 +598,8 @@ async def test_subscribe_unsubscribe_logbook_stream_excluded_entities( await hass.async_block_till_done() hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"}) - - msg = await asyncio.wait_for(websocket_client.receive_json(), 2) - assert msg["id"] == 7 - assert msg["type"] == "event" - assert "partial" not in msg["event"]["events"] - assert msg["event"]["events"] == [] + await get_instance(hass).async_block_till_done() + await hass.async_block_till_done() msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 @@ -747,6 +753,13 @@ async def test_subscribe_unsubscribe_logbook_stream_included_entities( assert msg["event"]["end_time"] > msg["event"]["start_time"] assert msg["event"]["partial"] is True + await hass.async_block_till_done() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"]["events"] + assert msg["event"]["events"] == [] + for entity_id in test_entities: hass.states.async_set(entity_id, STATE_ON) hass.states.async_set(entity_id, STATE_OFF) @@ -756,12 +769,7 @@ async def test_subscribe_unsubscribe_logbook_stream_included_entities( await hass.async_block_till_done() hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"}) - - msg = await asyncio.wait_for(websocket_client.receive_json(), 2) - assert msg["id"] == 7 - assert msg["type"] == "event" - assert "partial" not in msg["event"]["events"] - assert msg["event"]["events"] == [] + await hass.async_block_till_done() msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 @@ -958,6 +966,14 @@ async def test_logbook_stream_excluded_entities_inherits_filters_from_recorder( assert msg["event"]["end_time"] > msg["event"]["start_time"] assert msg["event"]["partial"] is True + await hass.async_block_till_done() + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"]["events"] + assert msg["event"]["events"] == [] + hass.states.async_set("light.exc", STATE_ON) hass.states.async_set("light.exc", STATE_OFF) hass.states.async_set("switch.any", STATE_ON) @@ -982,12 +998,7 @@ async def test_logbook_stream_excluded_entities_inherits_filters_from_recorder( await hass.async_block_till_done() hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"}) - - msg = await asyncio.wait_for(websocket_client.receive_json(), 2) - assert msg["id"] == 7 - assert msg["type"] == "event" - assert "partial" not in msg["event"]["events"] - assert msg["event"]["events"] == [] + await hass.async_block_till_done() msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 @@ -1121,6 +1132,14 @@ async def test_subscribe_unsubscribe_logbook_stream( assert msg["event"]["end_time"] > msg["event"]["start_time"] assert msg["event"]["partial"] is True + await hass.async_block_till_done() + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"]["events"] + assert msg["event"]["events"] == [] + hass.states.async_set("light.alpha", "on") hass.states.async_set("light.alpha", "off") alpha_off_state: State = hass.states.get("light.alpha") @@ -1138,12 +1157,6 @@ async def test_subscribe_unsubscribe_logbook_stream( hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"}) - msg = await asyncio.wait_for(websocket_client.receive_json(), 2) - assert msg["id"] == 7 - assert msg["type"] == "event" - assert "partial" not in msg["event"]["events"] - assert msg["event"]["events"] == [] - msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" @@ -1427,12 +1440,8 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( } ] - hass.states.async_set("light.alpha", STATE_ON) - hass.states.async_set("light.alpha", STATE_OFF) - hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"}) - + await get_instance(hass).async_block_till_done() await hass.async_block_till_done() - msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" @@ -1441,6 +1450,13 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( assert "partial" not in msg["event"] assert msg["event"]["events"] == [] + hass.states.async_set("light.alpha", STATE_ON) + hass.states.async_set("light.alpha", STATE_OFF) + hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"}) + + await get_instance(hass).async_block_till_done() + await hass.async_block_till_done() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" @@ -1455,6 +1471,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( hass.states.async_remove("light.alpha") hass.states.async_remove("light.small") + await get_instance(hass).async_block_till_done() await hass.async_block_till_done() await websocket_client.send_json( @@ -1520,10 +1537,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time( } ] - hass.states.async_set("light.alpha", STATE_ON) - hass.states.async_set("light.alpha", STATE_OFF) - hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"}) - + await get_instance(hass).async_block_till_done() await hass.async_block_till_done() msg = await asyncio.wait_for(websocket_client.receive_json(), 2) @@ -1532,6 +1546,12 @@ async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time( assert "partial" not in msg["event"] assert msg["event"]["events"] == [] + hass.states.async_set("light.alpha", STATE_ON) + hass.states.async_set("light.alpha", STATE_OFF) + hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"}) + + await hass.async_block_till_done() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" @@ -2176,7 +2196,6 @@ async def test_stream_consumer_stop_processing(hass, recorder_mock, hass_ws_clie @patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) -@patch("homeassistant.components.logbook.websocket_api.MAX_RECORDER_WAIT", 0.15) async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplog): """Test we still start live streaming if the recorder is far behind.""" now = dt_util.utcnow() @@ -2250,8 +2269,6 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo assert msg["type"] == TYPE_RESULT assert msg["success"] - assert "Recorder is behind" in caplog.text - @patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) async def test_subscribe_all_entities_are_continuous( @@ -2409,16 +2426,28 @@ async def test_subscribe_entities_some_have_uom_multiple( assert msg["type"] == TYPE_RESULT assert msg["success"] - _cycle_entities() + await get_instance(hass).async_block_till_done() + await hass.async_block_till_done() msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"]["partial"] is True assert msg["event"]["events"] == [ {"entity_id": "sensor.keep", "state": "off", "when": ANY}, {"entity_id": "sensor.keep_two", "state": "off", "when": ANY}, ] + assert msg["event"]["partial"] is True + + await get_instance(hass).async_block_till_done() + await hass.async_block_till_done() + _cycle_entities() + await hass.async_block_till_done() + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert "partial" not in msg["event"] + assert msg["event"]["events"] == [] _cycle_entities() await hass.async_block_till_done() @@ -2426,7 +2455,12 @@ async def test_subscribe_entities_some_have_uom_multiple( msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" - assert msg["event"]["events"] == [] + assert msg["event"]["events"] == [ + {"entity_id": "sensor.keep", "state": "on", "when": ANY}, + {"entity_id": "sensor.keep", "state": "off", "when": ANY}, + {"entity_id": "sensor.keep_two", "state": "on", "when": ANY}, + {"entity_id": "sensor.keep_two", "state": "off", "when": ANY}, + ] assert "partial" not in msg["event"] msg = await asyncio.wait_for(websocket_client.receive_json(), 2) @@ -2437,11 +2471,8 @@ async def test_subscribe_entities_some_have_uom_multiple( {"entity_id": "sensor.keep", "state": "off", "when": ANY}, {"entity_id": "sensor.keep_two", "state": "on", "when": ANY}, {"entity_id": "sensor.keep_two", "state": "off", "when": ANY}, - {"entity_id": "sensor.keep", "state": "on", "when": ANY}, - {"entity_id": "sensor.keep", "state": "off", "when": ANY}, - {"entity_id": "sensor.keep_two", "state": "on", "when": ANY}, - {"entity_id": "sensor.keep_two", "state": "off", "when": ANY}, ] + assert "partial" not in msg["event"] await websocket_client.close()