diff --git a/homeassistant/components/history/__init__.py b/homeassistant/components/history/__init__.py index f07fe82b50d..6170d40e42b 100644 --- a/homeassistant/components/history/__init__.py +++ b/homeassistant/components/history/__init__.py @@ -1,7 +1,9 @@ """Provide pre-made queries on top of the recorder component.""" from __future__ import annotations -from collections.abc import Iterable +import asyncio +from collections.abc import Callable, Iterable, MutableMapping +from dataclasses import dataclass from datetime import datetime as dt, timedelta from http import HTTPStatus import logging @@ -13,16 +15,45 @@ import voluptuous as vol from homeassistant.components import frontend, websocket_api from homeassistant.components.http import HomeAssistantView -from homeassistant.components.recorder import get_instance, history +from homeassistant.components.recorder import ( + DOMAIN as RECORDER_DOMAIN, + get_instance, + history, +) from homeassistant.components.recorder.filters import ( Filters, + extract_include_exclude_filter_conf, + merge_include_exclude_filters, sqlalchemy_filter_from_include_exclude_conf, ) from homeassistant.components.recorder.util import session_scope from homeassistant.components.websocket_api import messages -from homeassistant.core import HomeAssistant +from homeassistant.components.websocket_api.connection import ActiveConnection +from homeassistant.const import ( + COMPRESSED_STATE_ATTRIBUTES, + COMPRESSED_STATE_LAST_CHANGED, + COMPRESSED_STATE_LAST_UPDATED, + COMPRESSED_STATE_STATE, + EVENT_STATE_CHANGED, +) +from homeassistant.core import ( + CALLBACK_TYPE, + Event, + HomeAssistant, + State, + callback, + is_callback, +) import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.entityfilter import INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA +from homeassistant.helpers.entityfilter import ( + INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, + EntityFilter, + convert_include_exclude_filter, +) +from homeassistant.helpers.event import ( + async_track_point_in_utc_time, + async_track_state_change_event, +) from homeassistant.helpers.json import JSON_DUMP from homeassistant.helpers.typing import ConfigType import homeassistant.util.dt as dt_util @@ -31,10 +62,12 @@ _LOGGER = logging.getLogger(__name__) DOMAIN = "history" HISTORY_FILTERS = "history_filters" +HISTORY_ENTITIES_FILTER = "history_entities_filter" HISTORY_USE_INCLUDE_ORDER = "history_use_include_order" +EVENT_COALESCE_TIME = 0.35 CONF_ORDER = "use_include_order" - +MAX_PENDING_HISTORY_STATES = 2048 CONFIG_SCHEMA = vol.Schema( { @@ -46,18 +79,43 @@ CONFIG_SCHEMA = vol.Schema( ) +@dataclass +class HistoryLiveStream: + """Track a history live stream.""" + + stream_queue: asyncio.Queue[Event] + subscriptions: list[CALLBACK_TYPE] + end_time_unsub: CALLBACK_TYPE | None = None + task: asyncio.Task | None = None + wait_sync_task: asyncio.Task | None = None + + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the history hooks.""" conf = config.get(DOMAIN, {}) + recorder_conf = config.get(RECORDER_DOMAIN, {}) + history_conf = config.get(DOMAIN, {}) + recorder_filter = extract_include_exclude_filter_conf(recorder_conf) + logbook_filter = extract_include_exclude_filter_conf(history_conf) + merged_filter = merge_include_exclude_filters(recorder_filter, logbook_filter) + + possible_merged_entities_filter = convert_include_exclude_filter(merged_filter) + + if not possible_merged_entities_filter.empty_filter: + hass.data[ + HISTORY_FILTERS + ] = filters = sqlalchemy_filter_from_include_exclude_conf(conf) + hass.data[HISTORY_ENTITIES_FILTER] = possible_merged_entities_filter + else: + hass.data[HISTORY_FILTERS] = filters = None + hass.data[HISTORY_ENTITIES_FILTER] = None - hass.data[HISTORY_FILTERS] = filters = sqlalchemy_filter_from_include_exclude_conf( - conf - ) hass.data[HISTORY_USE_INCLUDE_ORDER] = use_include_order = conf.get(CONF_ORDER) hass.http.register_view(HistoryPeriodView(filters, use_include_order)) frontend.async_register_built_in_panel(hass, "history", "history", "hass:chart-box") websocket_api.async_register_command(hass, ws_get_history_during_period) + websocket_api.async_register_command(hass, ws_stream) return True @@ -146,17 +204,19 @@ async def ws_get_history_during_period( entity_ids = msg.get("entity_ids") include_start_time_state = msg["include_start_time_state"] + no_attributes = msg["no_attributes"] if ( not include_start_time_state and entity_ids - and not _entities_may_have_state_changes_after(hass, entity_ids, start_time) + and not _entities_may_have_state_changes_after( + hass, entity_ids, start_time, no_attributes + ) ): connection.send_result(msg["id"], {}) return significant_changes_only = msg["significant_changes_only"] - no_attributes = msg["no_attributes"] minimal_response = msg["minimal_response"] connection.send_message( @@ -232,7 +292,9 @@ class HistoryPeriodView(HomeAssistantView): if ( not include_start_time_state and entity_ids - and not _entities_may_have_state_changes_after(hass, entity_ids, start_time) + and not _entities_may_have_state_changes_after( + hass, entity_ids, start_time, no_attributes + ) ): return self.json([]) @@ -300,13 +362,409 @@ class HistoryPeriodView(HomeAssistantView): def _entities_may_have_state_changes_after( - hass: HomeAssistant, entity_ids: Iterable, start_time: dt + hass: HomeAssistant, entity_ids: Iterable, start_time: dt, no_attributes: bool ) -> bool: """Check the state machine to see if entities have changed since start time.""" for entity_id in entity_ids: state = hass.states.get(entity_id) + if state is None: + return True - if state is None or state.last_changed > start_time: + state_time = state.last_changed if no_attributes else state.last_updated + if state_time > start_time: return True return False + + +def _generate_stream_message( + states: MutableMapping[str, list[dict[str, Any]]], + start_day: dt, + end_day: dt, +) -> dict[str, Any]: + """Generate a history stream message response.""" + return { + "states": states, + "start_time": dt_util.utc_to_timestamp(start_day), + "end_time": dt_util.utc_to_timestamp(end_day), + } + + +@callback +def _async_send_empty_response( + connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None +) -> None: + """Send an empty response when we know all results are filtered away.""" + connection.send_result(msg_id) + stream_end_time = end_time or dt_util.utcnow() + _async_send_response(connection, msg_id, start_time, stream_end_time, {}) + + +@callback +def _async_send_response( + connection: ActiveConnection, + msg_id: int, + start_time: dt, + end_time: dt, + states: MutableMapping[str, list[dict[str, Any]]], +) -> None: + """Send a response.""" + empty_stream_message = _generate_stream_message(states, start_time, end_time) + empty_response = messages.event_message(msg_id, empty_stream_message) + connection.send_message(JSON_DUMP(empty_response)) + + +async def _async_send_historical_states( + hass: HomeAssistant, + connection: ActiveConnection, + msg_id: int, + start_time: dt, + end_time: dt, + entity_ids: list[str] | None, + filters: Filters | None, + include_start_time_state: bool, + significant_changes_only: bool, + minimal_response: bool, + no_attributes: bool, + send_empty: bool, +) -> dt | None: + """Fetch history significant_states and send them to the client.""" + states = cast( + MutableMapping[str, list[dict[str, Any]]], + await get_instance(hass).async_add_executor_job( + history.get_significant_states, + hass, + start_time, + end_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ), + ) + last_time = 0 + + for state_list in states.values(): + if ( + state_list + and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED]) + > last_time + ): + last_time = state_last_time + + if last_time == 0: + # If we did not send any states ever, we need to send an empty response + # so the websocket client knows it should render/process/consume the + # data. + if not send_empty: + return None + last_time_dt = end_time + else: + last_time_dt = dt_util.utc_from_timestamp(last_time) + _async_send_response(connection, msg_id, start_time, last_time_dt, states) + return last_time_dt if last_time != 0 else None + + +def _history_compressed_state(state: State, no_attributes: bool) -> dict[str, Any]: + """Convert a state to a compressed state.""" + comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state.state} + if not no_attributes or state.domain in history.NEED_ATTRIBUTE_DOMAINS: + comp_state[COMPRESSED_STATE_ATTRIBUTES] = state.attributes + comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp( + state.last_updated + ) + if state.last_changed != state.last_updated: + comp_state[COMPRESSED_STATE_LAST_CHANGED] = dt_util.utc_to_timestamp( + state.last_changed + ) + return comp_state + + +def _events_to_compressed_states( + events: Iterable[Event], no_attributes: bool +) -> MutableMapping[str, list[dict[str, Any]]]: + """Convert events to a compressed states.""" + states_by_entity_ids: dict[str, list[dict[str, Any]]] = {} + for event in events: + state: State = event.data["new_state"] + entity_id: str = state.entity_id + states_by_entity_ids.setdefault(entity_id, []).append( + _history_compressed_state(state, no_attributes) + ) + return states_by_entity_ids + + +async def _async_events_consumer( + subscriptions_setup_complete_time: dt, + connection: ActiveConnection, + msg_id: int, + stream_queue: asyncio.Queue[Event], + no_attributes: bool, +) -> None: + """Stream events from the queue.""" + while True: + events: list[Event] = [await stream_queue.get()] + # If the event is older than the last db + # event we already sent it so we skip it. + if events[0].time_fired <= subscriptions_setup_complete_time: + continue + # We sleep for the EVENT_COALESCE_TIME so + # we can group events together to minimize + # the number of websocket messages when the + # system is overloaded with an event storm + await asyncio.sleep(EVENT_COALESCE_TIME) + while not stream_queue.empty(): + events.append(stream_queue.get_nowait()) + + if history_states := _events_to_compressed_states(events, no_attributes): + connection.send_message( + JSON_DUMP( + messages.event_message( + msg_id, + {"states": history_states}, + ) + ) + ) + + +@callback +def _async_subscribe_events( + hass: HomeAssistant, + subscriptions: list[CALLBACK_TYPE], + target: Callable[[Event], None], + entities_filter: EntityFilter | None, + entity_ids: list[str] | None, + significant_changes_only: bool, + minimal_response: bool, +) -> None: + """Subscribe to events for the entities and devices or all. + + These are the events we need to listen for to do + the live history stream. + """ + assert is_callback(target), "target must be a callback" + + @callback + def _forward_state_events_filtered(event: Event) -> None: + """Filter state events and forward them.""" + if (new_state := event.data.get("new_state")) is None or ( + old_state := event.data.get("old_state") + ) is None: + return + assert isinstance(new_state, State) + assert isinstance(old_state, State) + if (entities_filter and not entities_filter(new_state.entity_id)) or ( + (significant_changes_only or minimal_response) + and new_state.state == old_state.state + and new_state.domain not in history.SIGNIFICANT_DOMAINS + ): + return + target(event) + + if entity_ids: + subscriptions.append( + async_track_state_change_event( + hass, entity_ids, _forward_state_events_filtered + ) + ) + return + + # We want the firehose + subscriptions.append( + hass.bus.async_listen( + EVENT_STATE_CHANGED, + _forward_state_events_filtered, + run_immediately=True, + ) + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "history/stream", + vol.Required("start_time"): str, + vol.Optional("end_time"): str, + vol.Optional("entity_ids"): [str], + vol.Optional("include_start_time_state", default=True): bool, + vol.Optional("significant_changes_only", default=True): bool, + vol.Optional("minimal_response", default=False): bool, + vol.Optional("no_attributes", default=False): bool, + } +) +@websocket_api.async_response +async def ws_stream( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any] +) -> None: + """Handle history stream websocket command.""" + start_time_str = msg["start_time"] + msg_id: int = msg["id"] + entity_ids: list[str] | None = msg.get("entity_ids") + utc_now = dt_util.utcnow() + filters: Filters | None = None + entities_filter: EntityFilter | None = None + if not entity_ids: + filters = hass.data[HISTORY_FILTERS] + entities_filter = hass.data[HISTORY_ENTITIES_FILTER] + + if start_time := dt_util.parse_datetime(start_time_str): + start_time = dt_util.as_utc(start_time) + + if not start_time or start_time > utc_now: + connection.send_error(msg_id, "invalid_start_time", "Invalid start_time") + return + + end_time_str = msg.get("end_time") + end_time: dt | None = None + if end_time_str: + if not (end_time := dt_util.parse_datetime(end_time_str)): + connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") + return + end_time = dt_util.as_utc(end_time) + if end_time < start_time: + connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") + return + + entity_ids = msg.get("entity_ids") + include_start_time_state = msg["include_start_time_state"] + significant_changes_only = msg["significant_changes_only"] + no_attributes = msg["no_attributes"] + minimal_response = msg["minimal_response"] + + if end_time and end_time <= utc_now: + if ( + not include_start_time_state + and entity_ids + and not _entities_may_have_state_changes_after( + hass, entity_ids, start_time, no_attributes + ) + ): + _async_send_empty_response(connection, msg_id, start_time, end_time) + return + + connection.subscriptions[msg_id] = callback(lambda: None) + connection.send_result(msg_id) + await _async_send_historical_states( + hass, + connection, + msg_id, + start_time, + end_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ) + return + + subscriptions: list[CALLBACK_TYPE] = [] + stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_HISTORY_STATES) + live_stream = HistoryLiveStream( + subscriptions=subscriptions, stream_queue=stream_queue + ) + + @callback + def _unsub(*_utc_time: Any) -> None: + """Unsubscribe from all events.""" + for subscription in subscriptions: + subscription() + subscriptions.clear() + if live_stream.task: + live_stream.task.cancel() + if live_stream.wait_sync_task: + live_stream.wait_sync_task.cancel() + if live_stream.end_time_unsub: + live_stream.end_time_unsub() + live_stream.end_time_unsub = None + + if end_time: + live_stream.end_time_unsub = async_track_point_in_utc_time( + hass, _unsub, end_time + ) + + @callback + def _queue_or_cancel(event: Event) -> None: + """Queue an event to be processed or cancel.""" + try: + stream_queue.put_nowait(event) + except asyncio.QueueFull: + _LOGGER.debug( + "Client exceeded max pending messages of %s", + MAX_PENDING_HISTORY_STATES, + ) + _unsub() + + _async_subscribe_events( + hass, + subscriptions, + _queue_or_cancel, + entities_filter, + entity_ids, + significant_changes_only=significant_changes_only, + minimal_response=minimal_response, + ) + subscriptions_setup_complete_time = dt_util.utcnow() + connection.subscriptions[msg_id] = _unsub + connection.send_result(msg_id) + # Fetch everything from history + last_event_time = await _async_send_historical_states( + hass, + connection, + msg_id, + start_time, + subscriptions_setup_complete_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ) + + if msg_id not in connection.subscriptions: + # Unsubscribe happened while sending historical states + return + + live_stream.task = asyncio.create_task( + _async_events_consumer( + subscriptions_setup_complete_time, + connection, + msg_id, + stream_queue, + no_attributes, + ) + ) + + live_stream.wait_sync_task = asyncio.create_task( + get_instance(hass).async_block_till_done() + ) + await live_stream.wait_sync_task + + # + # Fetch any states from the database that have + # not been committed since the original fetch + # so we can switch over to using the subscriptions + # + # We only want states that happened after the last state + # we had from the last database query + # + await _async_send_historical_states( + hass, + connection, + msg_id, + last_event_time or start_time, + subscriptions_setup_complete_time, + entity_ids, + filters, + False, # We don't want the start time state again + significant_changes_only, + minimal_response, + no_attributes, + send_empty=not last_event_time, + ) diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py index f0c4da26231..777f5cfb8bf 100644 --- a/tests/components/history/test_init.py +++ b/tests/components/history/test_init.py @@ -5,17 +5,26 @@ from http import HTTPStatus import json from unittest.mock import patch, sentinel +import async_timeout +from freezegun import freeze_time import pytest from homeassistant.components import history from homeassistant.components.recorder.history import get_significant_states from homeassistant.components.recorder.models import process_timestamp -from homeassistant.const import CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE +from homeassistant.const import ( + CONF_DOMAINS, + CONF_ENTITIES, + CONF_EXCLUDE, + CONF_INCLUDE, + EVENT_HOMEASSISTANT_FINAL_WRITE, +) import homeassistant.core as ha from homeassistant.helpers.json import JSONEncoder from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util +from tests.common import async_fire_time_changed from tests.components.recorder.common import ( async_recorder_block_till_done, async_wait_recording_done, @@ -23,6 +32,15 @@ from tests.components.recorder.common import ( ) +def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]: + """Return listeners without final write listeners since we are not testing for these.""" + return { + key: value + for key, value in listeners.items() + if key != EVENT_HOMEASSISTANT_FINAL_WRITE + } + + @pytest.mark.usefixtures("hass_history") def test_setup(): """Test setup method of history.""" @@ -1299,3 +1317,1118 @@ async def test_history_during_period_with_use_include_order( *sort_order, "sensor.three", ] + + +async def test_history_stream_historical_only(recorder_mock, hass, hass_ws_client): + """Test history stream.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.three", "off", attributes={"any": "changed"}) + sensor_three_last_updated = hass.states.get("sensor.three").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.four", "off", attributes={"any": "again"}) + sensor_four_last_updated = hass.states.get("sensor.four").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + end_time = dt_util.utcnow() + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + + assert response == { + "event": { + "end_time": sensor_four_last_updated.timestamp(), + "start_time": now.timestamp(), + "states": { + "sensor.four": [ + {"a": {}, "lu": sensor_four_last_updated.timestamp(), "s": "off"} + ], + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.three": [ + {"a": {}, "lu": sensor_three_last_updated.timestamp(), "s": "off"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_significant_domain_historical_only( + recorder_mock, hass, hass_ws_client +): + """Test the stream with climate domain with historical states only.""" + now = dt_util.utcnow() + + await async_setup_component(hass, "history", {}) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "on", attributes={"temperature": "1"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "2"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "3"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "4"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "on", attributes={"temperature": "5"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": now.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response == { + "event": { + "end_time": now.timestamp(), + "start_time": now.timestamp(), + "states": {}, + }, + "id": 1, + "type": "event", + } + + end_time = dt_util.utcnow() + await client.send_json( + { + "id": 2, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert "a" in sensor_test_history[1] + assert sensor_test_history[1]["s"] == "off" + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {} + + await client.send_json( + { + "id": 3, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "1"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"temperature": "2"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"temperature": "5"} + + await client.send_json( + { + "id": 4, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 4 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "1"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"temperature": "2"} + + assert sensor_test_history[2]["s"] == "off" + assert sensor_test_history[2]["a"] == {"temperature": "3"} + + assert sensor_test_history[3]["s"] == "off" + assert sensor_test_history[3]["a"] == {"temperature": "4"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"temperature": "5"} + + # Test we impute the state time state + later = dt_util.utcnow() + await client.send_json( + { + "id": 5, + "type": "history/stream", + "start_time": later.isoformat(), + "end_time": later.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 5 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + + assert len(sensor_test_history) == 1 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "5"} + assert sensor_test_history[0]["lu"] == later.timestamp() + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + +async def test_history_stream_bad_start_time(recorder_mock, hass, hass_ws_client): + """Test history stream bad state time.""" + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": "cats", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_start_time" + + +async def test_history_stream_end_time_before_start_time( + recorder_mock, hass, hass_ws_client +): + """Test history stream with an end_time before the start_time.""" + end_time = dt_util.utcnow() - timedelta(seconds=2) + start_time = dt_util.utcnow() - timedelta(seconds=1) + + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + +async def test_history_stream_bad_end_time(recorder_mock, hass, hass_ws_client): + """Test history stream bad end time.""" + now = dt_util.utcnow() + + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": "dogs", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + +async def test_history_stream_live_no_attributes_minimal_response( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data and no_attributes and minimal_response.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live(recorder_mock, hass, hass_ws_client): + """Test history stream with history and live data.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + "minimal_response": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + { + "a": {"any": "attr"}, + "lu": sensor_one_last_updated.timestamp(), + "s": "on", + } + ], + "sensor.two": [ + { + "a": {"any": "attr"}, + "lu": sensor_two_last_updated.timestamp(), + "s": "off", + } + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"diff": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_one_last_changed = hass.states.get("sensor.one").last_changed + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [ + { + "lc": sensor_one_last_changed.timestamp(), + "lu": sensor_one_last_updated.timestamp(), + "s": "on", + "a": {"diff": "attr"}, + } + ], + "sensor.two": [ + { + "lu": sensor_two_last_updated.timestamp(), + "s": "two", + "a": {"any": "attr"}, + } + ], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_minimal_response( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data and minimal_response.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + { + "a": {"any": "attr"}, + "lu": sensor_one_last_updated.timestamp(), + "s": "on", + } + ], + "sensor.two": [ + { + "a": {"any": "attr"}, + "lu": sensor_two_last_updated.timestamp(), + "s": "off", + } + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"diff": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + # Only sensor.two has changed + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + hass.states.async_remove("sensor.one") + hass.states.async_remove("sensor.two") + await async_recorder_block_till_done(hass) + + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.two": [ + { + "lu": sensor_two_last_updated.timestamp(), + "s": "two", + "a": {"any": "attr"}, + } + ], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_no_attributes(recorder_mock, hass, hass_ws_client): + """Test history stream with history and live data and no_attributes.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"diff": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"diff": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_no_attributes_minimal_response_specific_entities( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data and no_attributes and minimal_response with specific entities.""" + now = dt_util.utcnow() + wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + {history.DOMAIN: {}}, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_with_future_end_time( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data with future end time.""" + now = dt_util.utcnow() + wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + {history.DOMAIN: {}}, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + future = now + timedelta(seconds=10) + + client = await hass_ws_client() + init_listeners = hass.bus.async_listeners() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "end_time": future.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + async_fire_time_changed(hass, future + timedelta(seconds=1)) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "future", attributes={"any": "attr"}) + # Check our listener got unsubscribed + await async_wait_recording_done(hass) + await async_recorder_block_till_done(hass) + assert listeners_without_writes( + hass.bus.async_listeners() + ) == listeners_without_writes(init_listeners) + + +@pytest.mark.parametrize("include_start_time_state", (True, False)) +async def test_history_stream_before_history_starts( + recorder_mock, hass, hass_ws_client, include_start_time_state +): + """Test history stream before we have history.""" + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + await async_wait_recording_done(hass) + far_past = dt_util.utcnow() - timedelta(days=1000) + far_past_end = far_past + timedelta(seconds=10) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": ["sensor.one"], + "start_time": far_past.isoformat(), + "end_time": far_past_end.isoformat(), + "include_start_time_state": include_start_time_state, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + assert response == { + "event": { + "end_time": far_past_end.timestamp(), + "start_time": far_past.timestamp(), + "states": {}, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_for_entity_with_no_possible_changes( + recorder_mock, hass, hass_ws_client +): + """Test history stream for future with no possible changes where end time is less than or equal to now.""" + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + await async_wait_recording_done(hass) + + last_updated = hass.states.get("sensor.one").last_updated + start_time = last_updated + timedelta(seconds=10) + end_time = start_time + timedelta(seconds=10) + + with freeze_time(end_time): + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": ["sensor.one"], + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "include_start_time_state": False, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + assert response == { + "event": { + "end_time": end_time.timestamp(), + "start_time": start_time.timestamp(), + "states": {}, + }, + "id": 1, + "type": "event", + } + + +async def test_overflow_queue(recorder_mock, hass, hass_ws_client): + """Test overflowing the history stream queue.""" + now = dt_util.utcnow() + wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] + with patch.object(history, "MAX_PENDING_HISTORY_STATES", 5): + await async_setup_component( + hass, + "history", + {history.DOMAIN: {}}, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + init_listeners = hass.bus.async_listeners() + + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + # Overflow the queue + for val in range(10): + hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"}) + hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + assert listeners_without_writes( + hass.bus.async_listeners() + ) == listeners_without_writes(init_listeners)