From 9d95b9ab053b2106dc99f58805a2bfb862a2fb93 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 23 May 2022 17:40:00 -0500 Subject: [PATCH] Chunk large logbook queries and add an end_time to the api so we stop sending events (#72351) --- homeassistant/components/logbook/models.py | 2 +- .../components/logbook/websocket_api.py | 206 +++++++++-- .../components/logbook/test_websocket_api.py | 340 ++++++++++++++++-- 3 files changed, 483 insertions(+), 65 deletions(-) diff --git a/homeassistant/components/logbook/models.py b/homeassistant/components/logbook/models.py index e4844751c6a..591781745f7 100644 --- a/homeassistant/components/logbook/models.py +++ b/homeassistant/components/logbook/models.py @@ -59,7 +59,7 @@ class LazyEventPartialState: ) -@dataclass +@dataclass(frozen=True) class EventAsRow: """Convert an event to a row.""" diff --git a/homeassistant/components/logbook/websocket_api.py b/homeassistant/components/logbook/websocket_api.py index efb15efe97b..edfe3177f26 100644 --- a/homeassistant/components/logbook/websocket_api.py +++ b/homeassistant/components/logbook/websocket_api.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable +from dataclasses import dataclass from datetime import datetime as dt, timedelta import logging from typing import Any @@ -15,6 +16,7 @@ from homeassistant.components.websocket_api import messages from homeassistant.components.websocket_api.connection import ActiveConnection from homeassistant.components.websocket_api.const import JSON_DUMP from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback +from homeassistant.helpers.event import async_track_point_in_utc_time import homeassistant.util.dt as dt_util from .helpers import ( @@ -26,12 +28,26 @@ from .models import async_event_to_row from .processor import EventProcessor MAX_PENDING_LOGBOOK_EVENTS = 2048 -EVENT_COALESCE_TIME = 0.5 +EVENT_COALESCE_TIME = 0.35 MAX_RECORDER_WAIT = 10 +# minimum size that we will split the query +BIG_QUERY_HOURS = 25 +# how many hours to deliver in the first chunk when we split the query +BIG_QUERY_RECENT_HOURS = 24 _LOGGER = logging.getLogger(__name__) +@dataclass +class LogbookLiveStream: + """Track a logbook live stream.""" + + stream_queue: asyncio.Queue[Event] + subscriptions: list[CALLBACK_TYPE] + end_time_unsub: CALLBACK_TYPE | None = None + task: asyncio.Task | None = None + + @callback def async_setup(hass: HomeAssistant) -> None: """Set up the logbook websocket API.""" @@ -39,6 +55,94 @@ def async_setup(hass: HomeAssistant) -> None: websocket_api.async_register_command(hass, ws_event_stream) +async def _async_wait_for_recorder_sync(hass: HomeAssistant) -> None: + """Wait for the recorder to sync.""" + try: + await asyncio.wait_for( + get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT + ) + except asyncio.TimeoutError: + _LOGGER.debug( + "Recorder is behind more than %s seconds, starting live stream; Some results may be missing" + ) + + +async def _async_send_historical_events( + hass: HomeAssistant, + connection: ActiveConnection, + msg_id: int, + start_time: dt, + end_time: dt, + formatter: Callable[[int, Any], dict[str, Any]], + event_processor: EventProcessor, +) -> dt | None: + """Select historical data from the database and deliver it to the websocket. + + If the query is considered a big query we will split the request into + two chunks so that they get the recent events first and the select + that is expected to take a long time comes in after to ensure + they are not stuck at a loading screen and can start looking at + the data right away. + + This function returns the time of the most recent event we sent to the + websocket. + """ + is_big_query = ( + not event_processor.entity_ids + and not event_processor.device_ids + and ((end_time - start_time) > timedelta(hours=BIG_QUERY_HOURS)) + ) + + if not is_big_query: + message, last_event_time = await _async_get_ws_formatted_events( + hass, + msg_id, + start_time, + end_time, + formatter, + event_processor, + ) + # If there is no last_time, there are no historical + # results, but we still send an empty message so + # consumers of the api know their request was + # answered but there were no results + connection.send_message(message) + return last_event_time + + # This is a big query so we deliver + # the first three hours and then + # we fetch the old data + recent_query_start = end_time - timedelta(hours=BIG_QUERY_RECENT_HOURS) + recent_message, recent_query_last_event_time = await _async_get_ws_formatted_events( + hass, + msg_id, + recent_query_start, + end_time, + formatter, + event_processor, + ) + if recent_query_last_event_time: + connection.send_message(recent_message) + + older_message, older_query_last_event_time = await _async_get_ws_formatted_events( + hass, + msg_id, + start_time, + recent_query_start, + formatter, + event_processor, + ) + # If there is no last_time, there are no historical + # results, but we still send an empty message so + # consumers of the api know their request was + # answered but there were no results + if older_query_last_event_time or not recent_query_last_event_time: + connection.send_message(older_message) + + # Returns the time of the newest event + return recent_query_last_event_time or older_query_last_event_time + + async def _async_get_ws_formatted_events( hass: HomeAssistant, msg_id: int, @@ -75,14 +179,13 @@ def _ws_formatted_get_events( async def _async_events_consumer( - setup_complete_future: asyncio.Future[dt], + subscriptions_setup_complete_time: dt, connection: ActiveConnection, msg_id: int, stream_queue: asyncio.Queue[Event], event_processor: EventProcessor, ) -> None: """Stream events from the queue.""" - subscriptions_setup_complete_time = await setup_complete_future event_processor.switch_to_live() while True: @@ -116,6 +219,7 @@ async def _async_events_consumer( { vol.Required("type"): "logbook/event_stream", vol.Required("start_time"): str, + vol.Optional("end_time"): str, vol.Optional("entity_ids"): [str], vol.Optional("device_ids"): [str], } @@ -126,21 +230,32 @@ async def ws_event_stream( ) -> None: """Handle logbook stream events websocket command.""" start_time_str = msg["start_time"] + msg_id: int = msg["id"] utc_now = dt_util.utcnow() if start_time := dt_util.parse_datetime(start_time_str): start_time = dt_util.as_utc(start_time) if not start_time or start_time > utc_now: - connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time") + connection.send_error(msg_id, "invalid_start_time", "Invalid start_time") return + end_time_str = msg.get("end_time") + end_time: dt | None = None + if end_time_str: + if not (end_time := dt_util.parse_datetime(end_time_str)): + connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") + return + end_time = dt_util.as_utc(end_time) + if end_time < start_time: + connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") + return + device_ids = msg.get("device_ids") entity_ids = msg.get("entity_ids") if entity_ids: entity_ids = async_filter_entities(hass, entity_ids) event_types = async_determine_event_types(hass, entity_ids, device_ids) - event_processor = EventProcessor( hass, event_types, @@ -151,26 +266,43 @@ async def ws_event_stream( include_entity_name=False, ) - stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS) - subscriptions: list[CALLBACK_TYPE] = [] - setup_complete_future: asyncio.Future[dt] = asyncio.Future() - task = asyncio.create_task( - _async_events_consumer( - setup_complete_future, + if end_time and end_time <= utc_now: + # Not live stream but we it might be a big query + connection.subscriptions[msg_id] = callback(lambda: None) + connection.send_result(msg_id) + # Fetch everything from history + await _async_send_historical_events( + hass, connection, - msg["id"], - stream_queue, + msg_id, + start_time, + end_time, + messages.event_message, event_processor, ) + return + + subscriptions: list[CALLBACK_TYPE] = [] + stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS) + live_stream = LogbookLiveStream( + subscriptions=subscriptions, stream_queue=stream_queue ) - def _unsub() -> None: + @callback + def _unsub(*time: Any) -> None: """Unsubscribe from all events.""" for subscription in subscriptions: subscription() subscriptions.clear() - if task: - task.cancel() + if live_stream.task: + live_stream.task.cancel() + if live_stream.end_time_unsub: + live_stream.end_time_unsub() + + if end_time: + live_stream.end_time_unsub = async_track_point_in_utc_time( + hass, _unsub, end_time + ) @callback def _queue_or_cancel(event: Event) -> None: @@ -188,33 +320,21 @@ async def ws_event_stream( hass, subscriptions, _queue_or_cancel, event_types, entity_ids, device_ids ) subscriptions_setup_complete_time = dt_util.utcnow() - connection.subscriptions[msg["id"]] = _unsub - connection.send_result(msg["id"]) - + connection.subscriptions[msg_id] = _unsub + connection.send_result(msg_id) # Fetch everything from history - message, last_event_time = await _async_get_ws_formatted_events( + last_event_time = await _async_send_historical_events( hass, - msg["id"], + connection, + msg_id, start_time, subscriptions_setup_complete_time, messages.event_message, event_processor, ) - # If there is no last_time there are no historical - # results, but we still send an empty message so - # consumers of the api know their request was - # answered but there were no results - connection.send_message(message) - try: - await asyncio.wait_for( - get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT - ) - except asyncio.TimeoutError: - _LOGGER.debug( - "Recorder is behind more than %s seconds, starting live stream; Some results may be missing" - ) - if setup_complete_future.cancelled(): + await _async_wait_for_recorder_sync(hass) + if not subscriptions: # Unsubscribe happened while waiting for recorder return @@ -235,7 +355,7 @@ async def ws_event_stream( ) message, final_cutoff_time = await _async_get_ws_formatted_events( hass, - msg["id"], + msg_id, second_fetch_start_time, subscriptions_setup_complete_time, messages.event_message, @@ -244,9 +364,19 @@ async def ws_event_stream( if final_cutoff_time: # Only sends results if we have them connection.send_message(message) - if not setup_complete_future.cancelled(): + if not subscriptions: # Unsubscribe happened while waiting for formatted events - setup_complete_future.set_result(subscriptions_setup_complete_time) + return + + live_stream.task = asyncio.create_task( + _async_events_consumer( + subscriptions_setup_complete_time, + connection, + msg_id, + stream_queue, + event_processor, + ) + ) @websocket_api.websocket_command( diff --git a/tests/components/logbook/test_websocket_api.py b/tests/components/logbook/test_websocket_api.py index e1fc1defe09..26ad7127296 100644 --- a/tests/components/logbook/test_websocket_api.py +++ b/tests/components/logbook/test_websocket_api.py @@ -4,6 +4,7 @@ from collections.abc import Callable from datetime import timedelta from unittest.mock import ANY, patch +from freezegun import freeze_time import pytest from homeassistant import core @@ -26,7 +27,11 @@ from homeassistant.helpers import device_registry from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util -from tests.common import MockConfigEntry, SetupRecorderInstanceT +from tests.common import ( + MockConfigEntry, + SetupRecorderInstanceT, + async_fire_time_changed, +) from tests.components.recorder.common import ( async_block_recorder, async_recorder_block_till_done, @@ -479,12 +484,12 @@ async def test_subscribe_unsubscribe_logbook_stream( {"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()} ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -512,7 +517,7 @@ async def test_subscribe_unsubscribe_logbook_stream( hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"}) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -552,7 +557,7 @@ async def test_subscribe_unsubscribe_logbook_stream( hass.bus.async_fire(EVENT_HOMEASSISTANT_START) await hass.async_block_till_done() - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -616,7 +621,7 @@ async def test_subscribe_unsubscribe_logbook_stream( await hass.async_block_till_done() - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -733,7 +738,7 @@ async def test_subscribe_unsubscribe_logbook_stream( await websocket_client.send_json( {"id": 8, "type": "unsubscribe_events", "subscription": 7} ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 8 assert msg["type"] == TYPE_RESULT @@ -775,12 +780,12 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( } ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -797,7 +802,7 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( await hass.async_block_till_done() - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -815,7 +820,258 @@ async def test_subscribe_unsubscribe_logbook_stream_entities( await websocket_client.send_json( {"id": 8, "type": "unsubscribe_events", "subscription": 7} ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_unsubscribe_logbook_stream_entities_with_end_time( + hass, recorder_mock, hass_ws_client +): + """Test subscribe/unsubscribe logbook stream with specific entities and an end_time.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + state: State = hass.states.get("binary_sensor.is_light") + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "end_time": (now + timedelta(minutes=10)).isoformat(), + "entity_ids": ["light.small", "binary_sensor.is_light"], + } + ) + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.is_light", + "state": "off", + "when": state.last_updated.timestamp(), + } + ] + + hass.states.async_set("light.alpha", STATE_ON) + hass.states.async_set("light.alpha", STATE_OFF) + hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"}) + + await hass.async_block_till_done() + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "light.small", + "state": "off", + "when": ANY, + }, + ] + + hass.states.async_remove("light.alpha") + hass.states.async_remove("light.small") + await hass.async_block_till_done() + + async_fire_time_changed(hass, now + timedelta(minutes=11)) + await hass.async_block_till_done() + + # These states should not be sent since we should be unsubscribed + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("light.small", STATE_OFF) + await hass.async_block_till_done() + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) <= init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_unsubscribe_logbook_stream_entities_past_only( + hass, recorder_mock, hass_ws_client +): + """Test subscribe/unsubscribe logbook stream with specific entities in the past.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + state: State = hass.states.get("binary_sensor.is_light") + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "end_time": (dt_util.utcnow() - timedelta(microseconds=1)).isoformat(), + "entity_ids": ["light.small", "binary_sensor.is_light"], + } + ) + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.is_light", + "state": "off", + "when": state.last_updated.timestamp(), + } + ] + + # These states should not be sent since we should be unsubscribed + # since we only asked for the past + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("light.small", STATE_OFF) + await hass.async_block_till_done() + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_unsubscribe_logbook_stream_big_query( + hass, recorder_mock, hass_ws_client +): + """Test subscribe/unsubscribe logbook stream and ask for a large time frame. + + We should get the data for the first 24 hours in the first message, and + anything older will come in a followup message. + """ + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + four_days_ago = now - timedelta(days=4) + five_days_ago = now - timedelta(days=5) + + with freeze_time(four_days_ago): + hass.states.async_set("binary_sensor.four_days_ago", STATE_ON) + hass.states.async_set("binary_sensor.four_days_ago", STATE_OFF) + four_day_old_state: State = hass.states.get("binary_sensor.four_days_ago") + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + # Verify our state was recorded in the past + assert (now - four_day_old_state.last_updated).total_seconds() > 86400 * 3 + + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + hass.states.async_set("binary_sensor.is_light", STATE_ON) + current_state: State = hass.states.get("binary_sensor.is_light") + + # Verify our new state was recorded in the recent timeframe + assert (now - current_state.last_updated).total_seconds() < 2 + + await async_wait_recording_done(hass) + + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": five_days_ago.isoformat(), + } + ) + + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # With a big query we get the current state first + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.is_light", + "state": "on", + "when": current_state.last_updated.timestamp(), + } + ] + + # With a big query we get the old states second + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.four_days_ago", + "state": "off", + "when": four_day_old_state.last_updated.timestamp(), + } + ] + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 8 assert msg["type"] == TYPE_RESULT @@ -853,7 +1109,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device( } ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] @@ -864,7 +1120,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device( # and its not a failure case. This is useful # in the frontend so we can tell the user there # are no results vs waiting for them to appear - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [] @@ -872,7 +1128,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device( hass.bus.async_fire("mock_event", {"device_id": device.id}) await hass.async_block_till_done() - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -882,7 +1138,7 @@ async def test_subscribe_unsubscribe_logbook_stream_device( await websocket_client.send_json( {"id": 8, "type": "unsubscribe_events", "subscription": 7} ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 8 assert msg["type"] == TYPE_RESULT @@ -910,6 +1166,38 @@ async def test_event_stream_bad_start_time(hass, hass_ws_client, recorder_mock): assert response["error"]["code"] == "invalid_start_time" +async def test_event_stream_bad_end_time(hass, hass_ws_client, recorder_mock): + """Test event_stream bad end time.""" + await async_setup_component(hass, "logbook", {}) + await async_recorder_block_till_done(hass) + utc_now = dt_util.utcnow() + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/event_stream", + "start_time": utc_now.isoformat(), + "end_time": "cats", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + await client.send_json( + { + "id": 2, + "type": "logbook/event_stream", + "start_time": utc_now.isoformat(), + "end_time": (utc_now - timedelta(hours=5)).isoformat(), + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + async def test_live_stream_with_one_second_commit_interval( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, @@ -951,7 +1239,7 @@ async def test_live_stream_with_one_second_commit_interval( ) hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "4"}) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] @@ -959,7 +1247,7 @@ async def test_live_stream_with_one_second_commit_interval( hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "5"}) recieved_rows = [] - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" recieved_rows.extend(msg["event"]) @@ -990,7 +1278,7 @@ async def test_live_stream_with_one_second_commit_interval( await websocket_client.send_json( {"id": 8, "type": "unsubscribe_events", "subscription": 7} ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 8 assert msg["type"] == TYPE_RESULT @@ -1030,12 +1318,12 @@ async def test_subscribe_disconnected(hass, recorder_mock, hass_ws_client): } ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -1088,7 +1376,7 @@ async def test_stream_consumer_stop_processing(hass, recorder_mock, hass_ws_clie ) await async_wait_recording_done(hass) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] @@ -1135,7 +1423,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo } ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == TYPE_RESULT assert msg["success"] @@ -1146,7 +1434,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo # and its not a failure case. This is useful # in the frontend so we can tell the user there # are no results vs waiting for them to appear - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [] @@ -1154,7 +1442,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"}) await hass.async_block_till_done() - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -1164,7 +1452,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "2"}) await hass.async_block_till_done() - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 7 assert msg["type"] == "event" assert msg["event"] == [ @@ -1174,7 +1462,7 @@ async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplo await websocket_client.send_json( {"id": 8, "type": "unsubscribe_events", "subscription": 7} ) - msg = await websocket_client.receive_json() + msg = await asyncio.wait_for(websocket_client.receive_json(), 2) assert msg["id"] == 8 assert msg["type"] == TYPE_RESULT