diff --git a/homeassistant/components/history/__init__.py b/homeassistant/components/history/__init__.py index 6170d40e42b..1d5b8c0f15c 100644 --- a/homeassistant/components/history/__init__.py +++ b/homeassistant/components/history/__init__.py @@ -1,19 +1,16 @@ """Provide pre-made queries on top of the recorder component.""" from __future__ import annotations -import asyncio -from collections.abc import Callable, Iterable, MutableMapping -from dataclasses import dataclass from datetime import datetime as dt, timedelta from http import HTTPStatus import logging import time -from typing import Any, cast +from typing import cast from aiohttp import web import voluptuous as vol -from homeassistant.components import frontend, websocket_api +from homeassistant.components import frontend from homeassistant.components.http import HomeAssistantView from homeassistant.components.recorder import ( DOMAIN as RECORDER_DOMAIN, @@ -27,47 +24,27 @@ from homeassistant.components.recorder.filters import ( sqlalchemy_filter_from_include_exclude_conf, ) from homeassistant.components.recorder.util import session_scope -from homeassistant.components.websocket_api import messages -from homeassistant.components.websocket_api.connection import ActiveConnection -from homeassistant.const import ( - COMPRESSED_STATE_ATTRIBUTES, - COMPRESSED_STATE_LAST_CHANGED, - COMPRESSED_STATE_LAST_UPDATED, - COMPRESSED_STATE_STATE, - EVENT_STATE_CHANGED, -) -from homeassistant.core import ( - CALLBACK_TYPE, - Event, - HomeAssistant, - State, - callback, - is_callback, -) +from homeassistant.core import HomeAssistant import homeassistant.helpers.config_validation as cv from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, - EntityFilter, convert_include_exclude_filter, ) -from homeassistant.helpers.event import ( - async_track_point_in_utc_time, - async_track_state_change_event, -) -from homeassistant.helpers.json import JSON_DUMP from homeassistant.helpers.typing import ConfigType import homeassistant.util.dt as dt_util +from . import websocket_api +from .const import ( + DOMAIN, + HISTORY_ENTITIES_FILTER, + HISTORY_FILTERS, + HISTORY_USE_INCLUDE_ORDER, +) +from .helpers import entities_may_have_state_changes_after + _LOGGER = logging.getLogger(__name__) -DOMAIN = "history" -HISTORY_FILTERS = "history_filters" -HISTORY_ENTITIES_FILTER = "history_entities_filter" -HISTORY_USE_INCLUDE_ORDER = "history_use_include_order" -EVENT_COALESCE_TIME = 0.35 - CONF_ORDER = "use_include_order" -MAX_PENDING_HISTORY_STATES = 2048 CONFIG_SCHEMA = vol.Schema( { @@ -79,17 +56,6 @@ CONFIG_SCHEMA = vol.Schema( ) -@dataclass -class HistoryLiveStream: - """Track a history live stream.""" - - stream_queue: asyncio.Queue[Event] - subscriptions: list[CALLBACK_TYPE] - end_time_unsub: CALLBACK_TYPE | None = None - task: asyncio.Task | None = None - wait_sync_task: asyncio.Task | None = None - - async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the history hooks.""" conf = config.get(DOMAIN, {}) @@ -114,129 +80,10 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: hass.http.register_view(HistoryPeriodView(filters, use_include_order)) frontend.async_register_built_in_panel(hass, "history", "history", "hass:chart-box") - websocket_api.async_register_command(hass, ws_get_history_during_period) - websocket_api.async_register_command(hass, ws_stream) - + websocket_api.async_setup(hass) return True -def _ws_get_significant_states( - hass: HomeAssistant, - msg_id: int, - start_time: dt, - end_time: dt | None, - entity_ids: list[str] | None, - filters: Filters | None, - use_include_order: bool | None, - include_start_time_state: bool, - significant_changes_only: bool, - minimal_response: bool, - no_attributes: bool, -) -> str: - """Fetch history significant_states and convert them to json in the executor.""" - states = history.get_significant_states( - hass, - start_time, - end_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - minimal_response, - no_attributes, - True, - ) - - if not use_include_order or not filters: - return JSON_DUMP(messages.result_message(msg_id, states)) - - return JSON_DUMP( - messages.result_message( - msg_id, - { - order_entity: states.pop(order_entity) - for order_entity in filters.included_entities - if order_entity in states - } - | states, - ) - ) - - -@websocket_api.websocket_command( - { - vol.Required("type"): "history/history_during_period", - vol.Required("start_time"): str, - vol.Optional("end_time"): str, - vol.Optional("entity_ids"): [str], - vol.Optional("include_start_time_state", default=True): bool, - vol.Optional("significant_changes_only", default=True): bool, - vol.Optional("minimal_response", default=False): bool, - vol.Optional("no_attributes", default=False): bool, - } -) -@websocket_api.async_response -async def ws_get_history_during_period( - hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any] -) -> None: - """Handle history during period websocket command.""" - start_time_str = msg["start_time"] - end_time_str = msg.get("end_time") - - if start_time := dt_util.parse_datetime(start_time_str): - start_time = dt_util.as_utc(start_time) - else: - connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time") - return - - if end_time_str: - if end_time := dt_util.parse_datetime(end_time_str): - end_time = dt_util.as_utc(end_time) - else: - connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time") - return - else: - end_time = None - - if start_time > dt_util.utcnow(): - connection.send_result(msg["id"], {}) - return - - entity_ids = msg.get("entity_ids") - include_start_time_state = msg["include_start_time_state"] - no_attributes = msg["no_attributes"] - - if ( - not include_start_time_state - and entity_ids - and not _entities_may_have_state_changes_after( - hass, entity_ids, start_time, no_attributes - ) - ): - connection.send_result(msg["id"], {}) - return - - significant_changes_only = msg["significant_changes_only"] - minimal_response = msg["minimal_response"] - - connection.send_message( - await get_instance(hass).async_add_executor_job( - _ws_get_significant_states, - hass, - msg["id"], - start_time, - end_time, - entity_ids, - hass.data[HISTORY_FILTERS], - hass.data[HISTORY_USE_INCLUDE_ORDER], - include_start_time_state, - significant_changes_only, - minimal_response, - no_attributes, - ) - ) - - class HistoryPeriodView(HomeAssistantView): """Handle history period requests.""" @@ -292,7 +139,7 @@ class HistoryPeriodView(HomeAssistantView): if ( not include_start_time_state and entity_ids - and not _entities_may_have_state_changes_after( + and not entities_may_have_state_changes_after( hass, entity_ids, start_time, no_attributes ) ): @@ -359,412 +206,3 @@ class HistoryPeriodView(HomeAssistantView): ] sorted_result.extend(list(states.values())) return self.json(sorted_result) - - -def _entities_may_have_state_changes_after( - hass: HomeAssistant, entity_ids: Iterable, start_time: dt, no_attributes: bool -) -> bool: - """Check the state machine to see if entities have changed since start time.""" - for entity_id in entity_ids: - state = hass.states.get(entity_id) - if state is None: - return True - - state_time = state.last_changed if no_attributes else state.last_updated - if state_time > start_time: - return True - - return False - - -def _generate_stream_message( - states: MutableMapping[str, list[dict[str, Any]]], - start_day: dt, - end_day: dt, -) -> dict[str, Any]: - """Generate a history stream message response.""" - return { - "states": states, - "start_time": dt_util.utc_to_timestamp(start_day), - "end_time": dt_util.utc_to_timestamp(end_day), - } - - -@callback -def _async_send_empty_response( - connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None -) -> None: - """Send an empty response when we know all results are filtered away.""" - connection.send_result(msg_id) - stream_end_time = end_time or dt_util.utcnow() - _async_send_response(connection, msg_id, start_time, stream_end_time, {}) - - -@callback -def _async_send_response( - connection: ActiveConnection, - msg_id: int, - start_time: dt, - end_time: dt, - states: MutableMapping[str, list[dict[str, Any]]], -) -> None: - """Send a response.""" - empty_stream_message = _generate_stream_message(states, start_time, end_time) - empty_response = messages.event_message(msg_id, empty_stream_message) - connection.send_message(JSON_DUMP(empty_response)) - - -async def _async_send_historical_states( - hass: HomeAssistant, - connection: ActiveConnection, - msg_id: int, - start_time: dt, - end_time: dt, - entity_ids: list[str] | None, - filters: Filters | None, - include_start_time_state: bool, - significant_changes_only: bool, - minimal_response: bool, - no_attributes: bool, - send_empty: bool, -) -> dt | None: - """Fetch history significant_states and send them to the client.""" - states = cast( - MutableMapping[str, list[dict[str, Any]]], - await get_instance(hass).async_add_executor_job( - history.get_significant_states, - hass, - start_time, - end_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - minimal_response, - no_attributes, - True, - ), - ) - last_time = 0 - - for state_list in states.values(): - if ( - state_list - and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED]) - > last_time - ): - last_time = state_last_time - - if last_time == 0: - # If we did not send any states ever, we need to send an empty response - # so the websocket client knows it should render/process/consume the - # data. - if not send_empty: - return None - last_time_dt = end_time - else: - last_time_dt = dt_util.utc_from_timestamp(last_time) - _async_send_response(connection, msg_id, start_time, last_time_dt, states) - return last_time_dt if last_time != 0 else None - - -def _history_compressed_state(state: State, no_attributes: bool) -> dict[str, Any]: - """Convert a state to a compressed state.""" - comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state.state} - if not no_attributes or state.domain in history.NEED_ATTRIBUTE_DOMAINS: - comp_state[COMPRESSED_STATE_ATTRIBUTES] = state.attributes - comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp( - state.last_updated - ) - if state.last_changed != state.last_updated: - comp_state[COMPRESSED_STATE_LAST_CHANGED] = dt_util.utc_to_timestamp( - state.last_changed - ) - return comp_state - - -def _events_to_compressed_states( - events: Iterable[Event], no_attributes: bool -) -> MutableMapping[str, list[dict[str, Any]]]: - """Convert events to a compressed states.""" - states_by_entity_ids: dict[str, list[dict[str, Any]]] = {} - for event in events: - state: State = event.data["new_state"] - entity_id: str = state.entity_id - states_by_entity_ids.setdefault(entity_id, []).append( - _history_compressed_state(state, no_attributes) - ) - return states_by_entity_ids - - -async def _async_events_consumer( - subscriptions_setup_complete_time: dt, - connection: ActiveConnection, - msg_id: int, - stream_queue: asyncio.Queue[Event], - no_attributes: bool, -) -> None: - """Stream events from the queue.""" - while True: - events: list[Event] = [await stream_queue.get()] - # If the event is older than the last db - # event we already sent it so we skip it. - if events[0].time_fired <= subscriptions_setup_complete_time: - continue - # We sleep for the EVENT_COALESCE_TIME so - # we can group events together to minimize - # the number of websocket messages when the - # system is overloaded with an event storm - await asyncio.sleep(EVENT_COALESCE_TIME) - while not stream_queue.empty(): - events.append(stream_queue.get_nowait()) - - if history_states := _events_to_compressed_states(events, no_attributes): - connection.send_message( - JSON_DUMP( - messages.event_message( - msg_id, - {"states": history_states}, - ) - ) - ) - - -@callback -def _async_subscribe_events( - hass: HomeAssistant, - subscriptions: list[CALLBACK_TYPE], - target: Callable[[Event], None], - entities_filter: EntityFilter | None, - entity_ids: list[str] | None, - significant_changes_only: bool, - minimal_response: bool, -) -> None: - """Subscribe to events for the entities and devices or all. - - These are the events we need to listen for to do - the live history stream. - """ - assert is_callback(target), "target must be a callback" - - @callback - def _forward_state_events_filtered(event: Event) -> None: - """Filter state events and forward them.""" - if (new_state := event.data.get("new_state")) is None or ( - old_state := event.data.get("old_state") - ) is None: - return - assert isinstance(new_state, State) - assert isinstance(old_state, State) - if (entities_filter and not entities_filter(new_state.entity_id)) or ( - (significant_changes_only or minimal_response) - and new_state.state == old_state.state - and new_state.domain not in history.SIGNIFICANT_DOMAINS - ): - return - target(event) - - if entity_ids: - subscriptions.append( - async_track_state_change_event( - hass, entity_ids, _forward_state_events_filtered - ) - ) - return - - # We want the firehose - subscriptions.append( - hass.bus.async_listen( - EVENT_STATE_CHANGED, - _forward_state_events_filtered, - run_immediately=True, - ) - ) - - -@websocket_api.websocket_command( - { - vol.Required("type"): "history/stream", - vol.Required("start_time"): str, - vol.Optional("end_time"): str, - vol.Optional("entity_ids"): [str], - vol.Optional("include_start_time_state", default=True): bool, - vol.Optional("significant_changes_only", default=True): bool, - vol.Optional("minimal_response", default=False): bool, - vol.Optional("no_attributes", default=False): bool, - } -) -@websocket_api.async_response -async def ws_stream( - hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any] -) -> None: - """Handle history stream websocket command.""" - start_time_str = msg["start_time"] - msg_id: int = msg["id"] - entity_ids: list[str] | None = msg.get("entity_ids") - utc_now = dt_util.utcnow() - filters: Filters | None = None - entities_filter: EntityFilter | None = None - if not entity_ids: - filters = hass.data[HISTORY_FILTERS] - entities_filter = hass.data[HISTORY_ENTITIES_FILTER] - - if start_time := dt_util.parse_datetime(start_time_str): - start_time = dt_util.as_utc(start_time) - - if not start_time or start_time > utc_now: - connection.send_error(msg_id, "invalid_start_time", "Invalid start_time") - return - - end_time_str = msg.get("end_time") - end_time: dt | None = None - if end_time_str: - if not (end_time := dt_util.parse_datetime(end_time_str)): - connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") - return - end_time = dt_util.as_utc(end_time) - if end_time < start_time: - connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") - return - - entity_ids = msg.get("entity_ids") - include_start_time_state = msg["include_start_time_state"] - significant_changes_only = msg["significant_changes_only"] - no_attributes = msg["no_attributes"] - minimal_response = msg["minimal_response"] - - if end_time and end_time <= utc_now: - if ( - not include_start_time_state - and entity_ids - and not _entities_may_have_state_changes_after( - hass, entity_ids, start_time, no_attributes - ) - ): - _async_send_empty_response(connection, msg_id, start_time, end_time) - return - - connection.subscriptions[msg_id] = callback(lambda: None) - connection.send_result(msg_id) - await _async_send_historical_states( - hass, - connection, - msg_id, - start_time, - end_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - minimal_response, - no_attributes, - True, - ) - return - - subscriptions: list[CALLBACK_TYPE] = [] - stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_HISTORY_STATES) - live_stream = HistoryLiveStream( - subscriptions=subscriptions, stream_queue=stream_queue - ) - - @callback - def _unsub(*_utc_time: Any) -> None: - """Unsubscribe from all events.""" - for subscription in subscriptions: - subscription() - subscriptions.clear() - if live_stream.task: - live_stream.task.cancel() - if live_stream.wait_sync_task: - live_stream.wait_sync_task.cancel() - if live_stream.end_time_unsub: - live_stream.end_time_unsub() - live_stream.end_time_unsub = None - - if end_time: - live_stream.end_time_unsub = async_track_point_in_utc_time( - hass, _unsub, end_time - ) - - @callback - def _queue_or_cancel(event: Event) -> None: - """Queue an event to be processed or cancel.""" - try: - stream_queue.put_nowait(event) - except asyncio.QueueFull: - _LOGGER.debug( - "Client exceeded max pending messages of %s", - MAX_PENDING_HISTORY_STATES, - ) - _unsub() - - _async_subscribe_events( - hass, - subscriptions, - _queue_or_cancel, - entities_filter, - entity_ids, - significant_changes_only=significant_changes_only, - minimal_response=minimal_response, - ) - subscriptions_setup_complete_time = dt_util.utcnow() - connection.subscriptions[msg_id] = _unsub - connection.send_result(msg_id) - # Fetch everything from history - last_event_time = await _async_send_historical_states( - hass, - connection, - msg_id, - start_time, - subscriptions_setup_complete_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - minimal_response, - no_attributes, - True, - ) - - if msg_id not in connection.subscriptions: - # Unsubscribe happened while sending historical states - return - - live_stream.task = asyncio.create_task( - _async_events_consumer( - subscriptions_setup_complete_time, - connection, - msg_id, - stream_queue, - no_attributes, - ) - ) - - live_stream.wait_sync_task = asyncio.create_task( - get_instance(hass).async_block_till_done() - ) - await live_stream.wait_sync_task - - # - # Fetch any states from the database that have - # not been committed since the original fetch - # so we can switch over to using the subscriptions - # - # We only want states that happened after the last state - # we had from the last database query - # - await _async_send_historical_states( - hass, - connection, - msg_id, - last_event_time or start_time, - subscriptions_setup_complete_time, - entity_ids, - filters, - False, # We don't want the start time state again - significant_changes_only, - minimal_response, - no_attributes, - send_empty=not last_event_time, - ) diff --git a/homeassistant/components/history/const.py b/homeassistant/components/history/const.py new file mode 100644 index 00000000000..bf11329a8b4 --- /dev/null +++ b/homeassistant/components/history/const.py @@ -0,0 +1,9 @@ +"""History integration constants.""" + +DOMAIN = "history" +HISTORY_FILTERS = "history_filters" +HISTORY_ENTITIES_FILTER = "history_entities_filter" +HISTORY_USE_INCLUDE_ORDER = "history_use_include_order" +EVENT_COALESCE_TIME = 0.35 + +MAX_PENDING_HISTORY_STATES = 2048 diff --git a/homeassistant/components/history/helpers.py b/homeassistant/components/history/helpers.py new file mode 100644 index 00000000000..523b1fafb7f --- /dev/null +++ b/homeassistant/components/history/helpers.py @@ -0,0 +1,23 @@ +"""Helpers for the history integration.""" +from __future__ import annotations + +from collections.abc import Iterable +from datetime import datetime as dt + +from homeassistant.core import HomeAssistant + + +def entities_may_have_state_changes_after( + hass: HomeAssistant, entity_ids: Iterable, start_time: dt, no_attributes: bool +) -> bool: + """Check the state machine to see if entities have changed since start time.""" + for entity_id in entity_ids: + state = hass.states.get(entity_id) + if state is None: + return True + + state_time = state.last_changed if no_attributes else state.last_updated + if state_time > start_time: + return True + + return False diff --git a/homeassistant/components/history/websocket_api.py b/homeassistant/components/history/websocket_api.py new file mode 100644 index 00000000000..2bdb8372b20 --- /dev/null +++ b/homeassistant/components/history/websocket_api.py @@ -0,0 +1,578 @@ +"""Websocket API for the history integration.""" +from __future__ import annotations + +import asyncio +from collections.abc import Callable, Iterable, MutableMapping +from dataclasses import dataclass +from datetime import datetime as dt +import logging +from typing import Any, cast + +import voluptuous as vol + +from homeassistant.components import websocket_api +from homeassistant.components.recorder import get_instance, history +from homeassistant.components.recorder.filters import Filters +from homeassistant.components.websocket_api import messages +from homeassistant.components.websocket_api.connection import ActiveConnection +from homeassistant.const import ( + COMPRESSED_STATE_ATTRIBUTES, + COMPRESSED_STATE_LAST_CHANGED, + COMPRESSED_STATE_LAST_UPDATED, + COMPRESSED_STATE_STATE, + EVENT_STATE_CHANGED, +) +from homeassistant.core import ( + CALLBACK_TYPE, + Event, + HomeAssistant, + State, + callback, + is_callback, +) +from homeassistant.helpers.entityfilter import EntityFilter +from homeassistant.helpers.event import ( + async_track_point_in_utc_time, + async_track_state_change_event, +) +from homeassistant.helpers.json import JSON_DUMP +import homeassistant.util.dt as dt_util + +from .const import ( + EVENT_COALESCE_TIME, + HISTORY_ENTITIES_FILTER, + HISTORY_FILTERS, + HISTORY_USE_INCLUDE_ORDER, + MAX_PENDING_HISTORY_STATES, +) +from .helpers import entities_may_have_state_changes_after + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class HistoryLiveStream: + """Track a history live stream.""" + + stream_queue: asyncio.Queue[Event] + subscriptions: list[CALLBACK_TYPE] + end_time_unsub: CALLBACK_TYPE | None = None + task: asyncio.Task | None = None + wait_sync_task: asyncio.Task | None = None + + +@callback +def async_setup(hass: HomeAssistant) -> None: + """Set up the history websocket API.""" + websocket_api.async_register_command(hass, ws_get_history_during_period) + websocket_api.async_register_command(hass, ws_stream) + + +def _ws_get_significant_states( + hass: HomeAssistant, + msg_id: int, + start_time: dt, + end_time: dt | None, + entity_ids: list[str] | None, + filters: Filters | None, + use_include_order: bool | None, + include_start_time_state: bool, + significant_changes_only: bool, + minimal_response: bool, + no_attributes: bool, +) -> str: + """Fetch history significant_states and convert them to json in the executor.""" + states = history.get_significant_states( + hass, + start_time, + end_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ) + + if not use_include_order or not filters: + return JSON_DUMP(messages.result_message(msg_id, states)) + + return JSON_DUMP( + messages.result_message( + msg_id, + { + order_entity: states.pop(order_entity) + for order_entity in filters.included_entities + if order_entity in states + } + | states, + ) + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "history/history_during_period", + vol.Required("start_time"): str, + vol.Optional("end_time"): str, + vol.Optional("entity_ids"): [str], + vol.Optional("include_start_time_state", default=True): bool, + vol.Optional("significant_changes_only", default=True): bool, + vol.Optional("minimal_response", default=False): bool, + vol.Optional("no_attributes", default=False): bool, + } +) +@websocket_api.async_response +async def ws_get_history_during_period( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any] +) -> None: + """Handle history during period websocket command.""" + start_time_str = msg["start_time"] + end_time_str = msg.get("end_time") + + if start_time := dt_util.parse_datetime(start_time_str): + start_time = dt_util.as_utc(start_time) + else: + connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time") + return + + if end_time_str: + if end_time := dt_util.parse_datetime(end_time_str): + end_time = dt_util.as_utc(end_time) + else: + connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time") + return + else: + end_time = None + + if start_time > dt_util.utcnow(): + connection.send_result(msg["id"], {}) + return + + entity_ids = msg.get("entity_ids") + include_start_time_state = msg["include_start_time_state"] + no_attributes = msg["no_attributes"] + + if ( + not include_start_time_state + and entity_ids + and not entities_may_have_state_changes_after( + hass, entity_ids, start_time, no_attributes + ) + ): + connection.send_result(msg["id"], {}) + return + + significant_changes_only = msg["significant_changes_only"] + minimal_response = msg["minimal_response"] + + connection.send_message( + await get_instance(hass).async_add_executor_job( + _ws_get_significant_states, + hass, + msg["id"], + start_time, + end_time, + entity_ids, + hass.data[HISTORY_FILTERS], + hass.data[HISTORY_USE_INCLUDE_ORDER], + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + ) + ) + + +def _generate_stream_message( + states: MutableMapping[str, list[dict[str, Any]]], + start_day: dt, + end_day: dt, +) -> dict[str, Any]: + """Generate a history stream message response.""" + return { + "states": states, + "start_time": dt_util.utc_to_timestamp(start_day), + "end_time": dt_util.utc_to_timestamp(end_day), + } + + +@callback +def _async_send_empty_response( + connection: ActiveConnection, msg_id: int, start_time: dt, end_time: dt | None +) -> None: + """Send an empty response when we know all results are filtered away.""" + connection.send_result(msg_id) + stream_end_time = end_time or dt_util.utcnow() + _async_send_response(connection, msg_id, start_time, stream_end_time, {}) + + +@callback +def _async_send_response( + connection: ActiveConnection, + msg_id: int, + start_time: dt, + end_time: dt, + states: MutableMapping[str, list[dict[str, Any]]], +) -> None: + """Send a response.""" + empty_stream_message = _generate_stream_message(states, start_time, end_time) + empty_response = messages.event_message(msg_id, empty_stream_message) + connection.send_message(JSON_DUMP(empty_response)) + + +async def _async_send_historical_states( + hass: HomeAssistant, + connection: ActiveConnection, + msg_id: int, + start_time: dt, + end_time: dt, + entity_ids: list[str] | None, + filters: Filters | None, + include_start_time_state: bool, + significant_changes_only: bool, + minimal_response: bool, + no_attributes: bool, + send_empty: bool, +) -> dt | None: + """Fetch history significant_states and send them to the client.""" + states = cast( + MutableMapping[str, list[dict[str, Any]]], + await get_instance(hass).async_add_executor_job( + history.get_significant_states, + hass, + start_time, + end_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ), + ) + last_time = 0 + + for state_list in states.values(): + if ( + state_list + and (state_last_time := state_list[-1][COMPRESSED_STATE_LAST_UPDATED]) + > last_time + ): + last_time = state_last_time + + if last_time == 0: + # If we did not send any states ever, we need to send an empty response + # so the websocket client knows it should render/process/consume the + # data. + if not send_empty: + return None + last_time_dt = end_time + else: + last_time_dt = dt_util.utc_from_timestamp(last_time) + _async_send_response(connection, msg_id, start_time, last_time_dt, states) + return last_time_dt if last_time != 0 else None + + +def _history_compressed_state(state: State, no_attributes: bool) -> dict[str, Any]: + """Convert a state to a compressed state.""" + comp_state: dict[str, Any] = {COMPRESSED_STATE_STATE: state.state} + if not no_attributes or state.domain in history.NEED_ATTRIBUTE_DOMAINS: + comp_state[COMPRESSED_STATE_ATTRIBUTES] = state.attributes + comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp( + state.last_updated + ) + if state.last_changed != state.last_updated: + comp_state[COMPRESSED_STATE_LAST_CHANGED] = dt_util.utc_to_timestamp( + state.last_changed + ) + return comp_state + + +def _events_to_compressed_states( + events: Iterable[Event], no_attributes: bool +) -> MutableMapping[str, list[dict[str, Any]]]: + """Convert events to a compressed states.""" + states_by_entity_ids: dict[str, list[dict[str, Any]]] = {} + for event in events: + state: State = event.data["new_state"] + entity_id: str = state.entity_id + states_by_entity_ids.setdefault(entity_id, []).append( + _history_compressed_state(state, no_attributes) + ) + return states_by_entity_ids + + +async def _async_events_consumer( + subscriptions_setup_complete_time: dt, + connection: ActiveConnection, + msg_id: int, + stream_queue: asyncio.Queue[Event], + no_attributes: bool, +) -> None: + """Stream events from the queue.""" + while True: + events: list[Event] = [await stream_queue.get()] + # If the event is older than the last db + # event we already sent it so we skip it. + if events[0].time_fired <= subscriptions_setup_complete_time: + continue + # We sleep for the EVENT_COALESCE_TIME so + # we can group events together to minimize + # the number of websocket messages when the + # system is overloaded with an event storm + await asyncio.sleep(EVENT_COALESCE_TIME) + while not stream_queue.empty(): + events.append(stream_queue.get_nowait()) + + if history_states := _events_to_compressed_states(events, no_attributes): + connection.send_message( + JSON_DUMP( + messages.event_message( + msg_id, + {"states": history_states}, + ) + ) + ) + + +@callback +def _async_subscribe_events( + hass: HomeAssistant, + subscriptions: list[CALLBACK_TYPE], + target: Callable[[Event], None], + entities_filter: EntityFilter | None, + entity_ids: list[str] | None, + significant_changes_only: bool, + minimal_response: bool, +) -> None: + """Subscribe to events for the entities and devices or all. + + These are the events we need to listen for to do + the live history stream. + """ + assert is_callback(target), "target must be a callback" + + @callback + def _forward_state_events_filtered(event: Event) -> None: + """Filter state events and forward them.""" + if (new_state := event.data.get("new_state")) is None or ( + old_state := event.data.get("old_state") + ) is None: + return + assert isinstance(new_state, State) + assert isinstance(old_state, State) + if (entities_filter and not entities_filter(new_state.entity_id)) or ( + (significant_changes_only or minimal_response) + and new_state.state == old_state.state + and new_state.domain not in history.SIGNIFICANT_DOMAINS + ): + return + target(event) + + if entity_ids: + subscriptions.append( + async_track_state_change_event( + hass, entity_ids, _forward_state_events_filtered + ) + ) + return + + # We want the firehose + subscriptions.append( + hass.bus.async_listen( + EVENT_STATE_CHANGED, + _forward_state_events_filtered, + run_immediately=True, + ) + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "history/stream", + vol.Required("start_time"): str, + vol.Optional("end_time"): str, + vol.Optional("entity_ids"): [str], + vol.Optional("include_start_time_state", default=True): bool, + vol.Optional("significant_changes_only", default=True): bool, + vol.Optional("minimal_response", default=False): bool, + vol.Optional("no_attributes", default=False): bool, + } +) +@websocket_api.async_response +async def ws_stream( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any] +) -> None: + """Handle history stream websocket command.""" + start_time_str = msg["start_time"] + msg_id: int = msg["id"] + entity_ids: list[str] | None = msg.get("entity_ids") + utc_now = dt_util.utcnow() + filters: Filters | None = None + entities_filter: EntityFilter | None = None + if not entity_ids: + filters = hass.data[HISTORY_FILTERS] + entities_filter = hass.data[HISTORY_ENTITIES_FILTER] + + if start_time := dt_util.parse_datetime(start_time_str): + start_time = dt_util.as_utc(start_time) + + if not start_time or start_time > utc_now: + connection.send_error(msg_id, "invalid_start_time", "Invalid start_time") + return + + end_time_str = msg.get("end_time") + end_time: dt | None = None + if end_time_str: + if not (end_time := dt_util.parse_datetime(end_time_str)): + connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") + return + end_time = dt_util.as_utc(end_time) + if end_time < start_time: + connection.send_error(msg_id, "invalid_end_time", "Invalid end_time") + return + + entity_ids = msg.get("entity_ids") + include_start_time_state = msg["include_start_time_state"] + significant_changes_only = msg["significant_changes_only"] + no_attributes = msg["no_attributes"] + minimal_response = msg["minimal_response"] + + if end_time and end_time <= utc_now: + if ( + not include_start_time_state + and entity_ids + and not entities_may_have_state_changes_after( + hass, entity_ids, start_time, no_attributes + ) + ): + _async_send_empty_response(connection, msg_id, start_time, end_time) + return + + connection.subscriptions[msg_id] = callback(lambda: None) + connection.send_result(msg_id) + await _async_send_historical_states( + hass, + connection, + msg_id, + start_time, + end_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ) + return + + subscriptions: list[CALLBACK_TYPE] = [] + stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_HISTORY_STATES) + live_stream = HistoryLiveStream( + subscriptions=subscriptions, stream_queue=stream_queue + ) + + @callback + def _unsub(*_utc_time: Any) -> None: + """Unsubscribe from all events.""" + for subscription in subscriptions: + subscription() + subscriptions.clear() + if live_stream.task: + live_stream.task.cancel() + if live_stream.wait_sync_task: + live_stream.wait_sync_task.cancel() + if live_stream.end_time_unsub: + live_stream.end_time_unsub() + live_stream.end_time_unsub = None + + if end_time: + live_stream.end_time_unsub = async_track_point_in_utc_time( + hass, _unsub, end_time + ) + + @callback + def _queue_or_cancel(event: Event) -> None: + """Queue an event to be processed or cancel.""" + try: + stream_queue.put_nowait(event) + except asyncio.QueueFull: + _LOGGER.debug( + "Client exceeded max pending messages of %s", + MAX_PENDING_HISTORY_STATES, + ) + _unsub() + + _async_subscribe_events( + hass, + subscriptions, + _queue_or_cancel, + entities_filter, + entity_ids, + significant_changes_only=significant_changes_only, + minimal_response=minimal_response, + ) + subscriptions_setup_complete_time = dt_util.utcnow() + connection.subscriptions[msg_id] = _unsub + connection.send_result(msg_id) + # Fetch everything from history + last_event_time = await _async_send_historical_states( + hass, + connection, + msg_id, + start_time, + subscriptions_setup_complete_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + True, + ) + + if msg_id not in connection.subscriptions: + # Unsubscribe happened while sending historical states + return + + live_stream.task = asyncio.create_task( + _async_events_consumer( + subscriptions_setup_complete_time, + connection, + msg_id, + stream_queue, + no_attributes, + ) + ) + + live_stream.wait_sync_task = asyncio.create_task( + get_instance(hass).async_block_till_done() + ) + await live_stream.wait_sync_task + + # + # Fetch any states from the database that have + # not been committed since the original fetch + # so we can switch over to using the subscriptions + # + # We only want states that happened after the last state + # we had from the last database query + # + await _async_send_historical_states( + hass, + connection, + msg_id, + last_event_time or start_time, + subscriptions_setup_complete_time, + entity_ids, + filters, + False, # We don't want the start time state again + significant_changes_only, + minimal_response, + no_attributes, + send_empty=not last_event_time, + ) diff --git a/tests/components/history/test_init.py b/tests/components/history/test_init.py index 777f5cfb8bf..0774cb520b5 100644 --- a/tests/components/history/test_init.py +++ b/tests/components/history/test_init.py @@ -5,8 +5,6 @@ from http import HTTPStatus import json from unittest.mock import patch, sentinel -import async_timeout -from freezegun import freeze_time import pytest from homeassistant.components import history @@ -24,9 +22,7 @@ from homeassistant.helpers.json import JSONEncoder from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util -from tests.common import async_fire_time_changed from tests.components.recorder.common import ( - async_recorder_block_till_done, async_wait_recording_done, wait_recording_done, ) @@ -855,1580 +851,3 @@ async def test_entity_ids_limit_via_api_with_skip_initial_state( assert len(response_json) == 2 assert response_json[0][0]["entity_id"] == "light.kitchen" assert response_json[1][0]["entity_id"] == "light.cow" - - -async def test_history_during_period(recorder_mock, hass, hass_ws_client): - """Test history_during_period.""" - now = dt_util.utcnow() - - await async_setup_component(hass, "history", {}) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "changed"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "again"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "end_time": now.isoformat(), - "entity_ids": ["sensor.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["result"] == {} - - await client.send_json( - { - "id": 2, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "entity_ids": ["sensor.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 2 - - sensor_test_history = response["result"]["sensor.test"] - assert len(sensor_test_history) == 3 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert "a" not in sensor_test_history[1] - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - - assert sensor_test_history[2]["s"] == "on" - assert "a" not in sensor_test_history[2] - - await client.send_json( - { - "id": 3, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "entity_ids": ["sensor.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 3 - sensor_test_history = response["result"]["sensor.test"] - - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"any": "attr"} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - assert sensor_test_history[1]["a"] == {"any": "attr"} - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {"any": "attr"} - - await client.send_json( - { - "id": 4, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "entity_ids": ["sensor.test"], - "include_start_time_state": True, - "significant_changes_only": True, - "no_attributes": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 4 - sensor_test_history = response["result"]["sensor.test"] - - assert len(sensor_test_history) == 3 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"any": "attr"} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - assert sensor_test_history[1]["a"] == {"any": "attr"} - - assert sensor_test_history[2]["s"] == "on" - assert sensor_test_history[2]["a"] == {"any": "attr"} - - -async def test_history_during_period_impossible_conditions( - recorder_mock, hass, hass_ws_client -): - """Test history_during_period returns when condition cannot be true.""" - await async_setup_component(hass, "history", {}) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "changed"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "off", attributes={"any": "again"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - after = dt_util.utcnow() - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/history_during_period", - "start_time": after.isoformat(), - "end_time": after.isoformat(), - "entity_ids": ["sensor.test"], - "include_start_time_state": False, - "significant_changes_only": False, - "no_attributes": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["result"] == {} - - future = dt_util.utcnow() + timedelta(hours=10) - - await client.send_json( - { - "id": 2, - "type": "history/history_during_period", - "start_time": future.isoformat(), - "entity_ids": ["sensor.test"], - "include_start_time_state": True, - "significant_changes_only": True, - "no_attributes": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 2 - assert response["result"] == {} - - -@pytest.mark.parametrize( - "time_zone", ["UTC", "Europe/Berlin", "America/Chicago", "US/Hawaii"] -) -async def test_history_during_period_significant_domain( - time_zone, recorder_mock, hass, hass_ws_client -): - """Test history_during_period with climate domain.""" - hass.config.set_time_zone(time_zone) - now = dt_util.utcnow() - - await async_setup_component(hass, "history", {}) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "on", attributes={"temperature": "1"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "off", attributes={"temperature": "2"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "off", attributes={"temperature": "3"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "off", attributes={"temperature": "4"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "on", attributes={"temperature": "5"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "end_time": now.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["result"] == {} - - await client.send_json( - { - "id": 2, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 2 - - sensor_test_history = response["result"]["climate.test"] - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert "a" in sensor_test_history[1] - assert sensor_test_history[1]["s"] == "off" - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {} - - await client.send_json( - { - "id": 3, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 3 - sensor_test_history = response["result"]["climate.test"] - - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"temperature": "1"} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - assert sensor_test_history[1]["a"] == {"temperature": "2"} - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {"temperature": "5"} - - await client.send_json( - { - "id": 4, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": True, - "no_attributes": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 4 - sensor_test_history = response["result"]["climate.test"] - - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"temperature": "1"} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - assert sensor_test_history[1]["a"] == {"temperature": "2"} - - assert sensor_test_history[2]["s"] == "off" - assert sensor_test_history[2]["a"] == {"temperature": "3"} - - assert sensor_test_history[3]["s"] == "off" - assert sensor_test_history[3]["a"] == {"temperature": "4"} - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {"temperature": "5"} - - # Test we impute the state time state - later = dt_util.utcnow() - await client.send_json( - { - "id": 5, - "type": "history/history_during_period", - "start_time": later.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": True, - "no_attributes": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 5 - sensor_test_history = response["result"]["climate.test"] - - assert len(sensor_test_history) == 1 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"temperature": "5"} - assert sensor_test_history[0]["lu"] == later.timestamp() - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - -async def test_history_during_period_bad_start_time( - recorder_mock, hass, hass_ws_client -): - """Test history_during_period bad state time.""" - await async_setup_component( - hass, - "history", - {"history": {}}, - ) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/history_during_period", - "start_time": "cats", - } - ) - response = await client.receive_json() - assert not response["success"] - assert response["error"]["code"] == "invalid_start_time" - - -async def test_history_during_period_bad_end_time(recorder_mock, hass, hass_ws_client): - """Test history_during_period bad end time.""" - now = dt_util.utcnow() - - await async_setup_component( - hass, - "history", - {"history": {}}, - ) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "end_time": "dogs", - } - ) - response = await client.receive_json() - assert not response["success"] - assert response["error"]["code"] == "invalid_end_time" - - -async def test_history_during_period_with_use_include_order( - recorder_mock, hass, hass_ws_client -): - """Test history_during_period.""" - now = dt_util.utcnow() - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.three", "off", attributes={"any": "changed"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.four", "off", attributes={"any": "again"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/history_during_period", - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - - assert list(response["result"]) == [ - *sort_order, - "sensor.three", - ] - - -async def test_history_stream_historical_only(recorder_mock, hass, hass_ws_client): - """Test history stream.""" - now = dt_util.utcnow() - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.three", "off", attributes={"any": "changed"}) - sensor_three_last_updated = hass.states.get("sensor.three").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.four", "off", attributes={"any": "again"}) - sensor_four_last_updated = hass.states.get("sensor.four").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - end_time = dt_util.utcnow() - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "end_time": end_time.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - - assert response == { - "event": { - "end_time": sensor_four_last_updated.timestamp(), - "start_time": now.timestamp(), - "states": { - "sensor.four": [ - {"a": {}, "lu": sensor_four_last_updated.timestamp(), "s": "off"} - ], - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.three": [ - {"a": {}, "lu": sensor_three_last_updated.timestamp(), "s": "off"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], - }, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_significant_domain_historical_only( - recorder_mock, hass, hass_ws_client -): - """Test the stream with climate domain with historical states only.""" - now = dt_util.utcnow() - - await async_setup_component(hass, "history", {}) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "on", attributes={"temperature": "1"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "off", attributes={"temperature": "2"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "off", attributes={"temperature": "3"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "off", attributes={"temperature": "4"}) - await async_recorder_block_till_done(hass) - hass.states.async_set("climate.test", "on", attributes={"temperature": "5"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "end_time": now.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - } - ) - async with async_timeout.timeout(3): - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - async with async_timeout.timeout(3): - response = await client.receive_json() - assert response == { - "event": { - "end_time": now.timestamp(), - "start_time": now.timestamp(), - "states": {}, - }, - "id": 1, - "type": "event", - } - - end_time = dt_util.utcnow() - await client.send_json( - { - "id": 2, - "type": "history/stream", - "start_time": now.isoformat(), - "end_time": end_time.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - async with async_timeout.timeout(3): - response = await client.receive_json() - assert response["success"] - assert response["id"] == 2 - assert response["type"] == "result" - - async with async_timeout.timeout(3): - response = await client.receive_json() - sensor_test_history = response["event"]["states"]["climate.test"] - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert "a" in sensor_test_history[1] - assert sensor_test_history[1]["s"] == "off" - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {} - - await client.send_json( - { - "id": 3, - "type": "history/stream", - "start_time": now.isoformat(), - "end_time": end_time.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": False, - } - ) - async with async_timeout.timeout(3): - response = await client.receive_json() - assert response["success"] - assert response["id"] == 3 - assert response["type"] == "result" - - async with async_timeout.timeout(3): - response = await client.receive_json() - sensor_test_history = response["event"]["states"]["climate.test"] - - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"temperature": "1"} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - assert sensor_test_history[1]["a"] == {"temperature": "2"} - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {"temperature": "5"} - - await client.send_json( - { - "id": 4, - "type": "history/stream", - "start_time": now.isoformat(), - "end_time": end_time.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": True, - "no_attributes": False, - } - ) - async with async_timeout.timeout(3): - response = await client.receive_json() - assert response["success"] - assert response["id"] == 4 - assert response["type"] == "result" - - async with async_timeout.timeout(3): - response = await client.receive_json() - sensor_test_history = response["event"]["states"]["climate.test"] - - assert len(sensor_test_history) == 5 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"temperature": "1"} - assert isinstance(sensor_test_history[0]["lu"], float) - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - assert sensor_test_history[1]["s"] == "off" - assert isinstance(sensor_test_history[1]["lu"], float) - assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) - assert sensor_test_history[1]["a"] == {"temperature": "2"} - - assert sensor_test_history[2]["s"] == "off" - assert sensor_test_history[2]["a"] == {"temperature": "3"} - - assert sensor_test_history[3]["s"] == "off" - assert sensor_test_history[3]["a"] == {"temperature": "4"} - - assert sensor_test_history[4]["s"] == "on" - assert sensor_test_history[4]["a"] == {"temperature": "5"} - - # Test we impute the state time state - later = dt_util.utcnow() - await client.send_json( - { - "id": 5, - "type": "history/stream", - "start_time": later.isoformat(), - "end_time": later.isoformat(), - "entity_ids": ["climate.test"], - "include_start_time_state": True, - "significant_changes_only": True, - "no_attributes": False, - } - ) - async with async_timeout.timeout(3): - response = await client.receive_json() - assert response["success"] - assert response["id"] == 5 - assert response["type"] == "result" - - async with async_timeout.timeout(3): - response = await client.receive_json() - sensor_test_history = response["event"]["states"]["climate.test"] - - assert len(sensor_test_history) == 1 - - assert sensor_test_history[0]["s"] == "on" - assert sensor_test_history[0]["a"] == {"temperature": "5"} - assert sensor_test_history[0]["lu"] == later.timestamp() - assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) - - -async def test_history_stream_bad_start_time(recorder_mock, hass, hass_ws_client): - """Test history stream bad state time.""" - await async_setup_component( - hass, - "history", - {"history": {}}, - ) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": "cats", - } - ) - response = await client.receive_json() - assert not response["success"] - assert response["error"]["code"] == "invalid_start_time" - - -async def test_history_stream_end_time_before_start_time( - recorder_mock, hass, hass_ws_client -): - """Test history stream with an end_time before the start_time.""" - end_time = dt_util.utcnow() - timedelta(seconds=2) - start_time = dt_util.utcnow() - timedelta(seconds=1) - - await async_setup_component( - hass, - "history", - {"history": {}}, - ) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": start_time.isoformat(), - "end_time": end_time.isoformat(), - } - ) - response = await client.receive_json() - assert not response["success"] - assert response["error"]["code"] == "invalid_end_time" - - -async def test_history_stream_bad_end_time(recorder_mock, hass, hass_ws_client): - """Test history stream bad end time.""" - now = dt_util.utcnow() - - await async_setup_component( - hass, - "history", - {"history": {}}, - ) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "end_time": "dogs", - } - ) - response = await client.receive_json() - assert not response["success"] - assert response["error"]["code"] == "invalid_end_time" - - -async def test_history_stream_live_no_attributes_minimal_response( - recorder_mock, hass, hass_ws_client -): - """Test history stream with history and live data and no_attributes and minimal_response.""" - now = dt_util.utcnow() - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) - hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - response = await client.receive_json() - assert response == { - "event": { - "states": { - "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], - "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], - }, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_live(recorder_mock, hass, hass_ws_client): - """Test history stream with history and live data.""" - now = dt_util.utcnow() - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": False, - "minimal_response": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - { - "a": {"any": "attr"}, - "lu": sensor_one_last_updated.timestamp(), - "s": "on", - } - ], - "sensor.two": [ - { - "a": {"any": "attr"}, - "lu": sensor_two_last_updated.timestamp(), - "s": "off", - } - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"diff": "attr"}) - hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - sensor_one_last_changed = hass.states.get("sensor.one").last_changed - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - response = await client.receive_json() - assert response == { - "event": { - "states": { - "sensor.one": [ - { - "lc": sensor_one_last_changed.timestamp(), - "lu": sensor_one_last_updated.timestamp(), - "s": "on", - "a": {"diff": "attr"}, - } - ], - "sensor.two": [ - { - "lu": sensor_two_last_updated.timestamp(), - "s": "two", - "a": {"any": "attr"}, - } - ], - }, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_live_minimal_response( - recorder_mock, hass, hass_ws_client -): - """Test history stream with history and live data and minimal_response.""" - now = dt_util.utcnow() - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": False, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - { - "a": {"any": "attr"}, - "lu": sensor_one_last_updated.timestamp(), - "s": "on", - } - ], - "sensor.two": [ - { - "a": {"any": "attr"}, - "lu": sensor_two_last_updated.timestamp(), - "s": "off", - } - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"diff": "attr"}) - hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) - # Only sensor.two has changed - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - hass.states.async_remove("sensor.one") - hass.states.async_remove("sensor.two") - await async_recorder_block_till_done(hass) - - response = await client.receive_json() - assert response == { - "event": { - "states": { - "sensor.two": [ - { - "lu": sensor_two_last_updated.timestamp(), - "s": "two", - "a": {"any": "attr"}, - } - ], - }, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_live_no_attributes(recorder_mock, hass, hass_ws_client): - """Test history stream with history and live data and no_attributes.""" - now = dt_util.utcnow() - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": False, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "one", attributes={"diff": "attr"}) - hass.states.async_set("sensor.two", "two", attributes={"diff": "attr"}) - await async_recorder_block_till_done(hass) - - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - response = await client.receive_json() - assert response == { - "event": { - "states": { - "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], - "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], - }, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_live_no_attributes_minimal_response_specific_entities( - recorder_mock, hass, hass_ws_client -): - """Test history stream with history and live data and no_attributes and minimal_response with specific entities.""" - now = dt_util.utcnow() - wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - {history.DOMAIN: {}}, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "entity_ids": wanted_entities, - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) - hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - response = await client.receive_json() - assert response == { - "event": { - "states": { - "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], - "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], - }, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_live_with_future_end_time( - recorder_mock, hass, hass_ws_client -): - """Test history stream with history and live data with future end time.""" - now = dt_util.utcnow() - wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - {history.DOMAIN: {}}, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - future = now + timedelta(seconds=10) - - client = await hass_ws_client() - init_listeners = hass.bus.async_listeners() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "entity_ids": wanted_entities, - "start_time": now.isoformat(), - "end_time": future.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) - hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - response = await client.receive_json() - assert response == { - "event": { - "states": { - "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], - "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], - }, - }, - "id": 1, - "type": "event", - } - - async_fire_time_changed(hass, future + timedelta(seconds=1)) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "future", attributes={"any": "attr"}) - # Check our listener got unsubscribed - await async_wait_recording_done(hass) - await async_recorder_block_till_done(hass) - assert listeners_without_writes( - hass.bus.async_listeners() - ) == listeners_without_writes(init_listeners) - - -@pytest.mark.parametrize("include_start_time_state", (True, False)) -async def test_history_stream_before_history_starts( - recorder_mock, hass, hass_ws_client, include_start_time_state -): - """Test history stream before we have history.""" - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - await async_wait_recording_done(hass) - far_past = dt_util.utcnow() - timedelta(days=1000) - far_past_end = far_past + timedelta(seconds=10) - - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "entity_ids": ["sensor.one"], - "start_time": far_past.isoformat(), - "end_time": far_past_end.isoformat(), - "include_start_time_state": include_start_time_state, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - assert response == { - "event": { - "end_time": far_past_end.timestamp(), - "start_time": far_past.timestamp(), - "states": {}, - }, - "id": 1, - "type": "event", - } - - -async def test_history_stream_for_entity_with_no_possible_changes( - recorder_mock, hass, hass_ws_client -): - """Test history stream for future with no possible changes where end time is less than or equal to now.""" - sort_order = ["sensor.two", "sensor.four", "sensor.one"] - await async_setup_component( - hass, - "history", - { - history.DOMAIN: { - history.CONF_ORDER: True, - CONF_INCLUDE: { - CONF_ENTITIES: sort_order, - CONF_DOMAINS: ["sensor"], - }, - } - }, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - await async_wait_recording_done(hass) - - last_updated = hass.states.get("sensor.one").last_updated - start_time = last_updated + timedelta(seconds=10) - end_time = start_time + timedelta(seconds=10) - - with freeze_time(end_time): - client = await hass_ws_client() - await client.send_json( - { - "id": 1, - "type": "history/stream", - "entity_ids": ["sensor.one"], - "start_time": start_time.isoformat(), - "end_time": end_time.isoformat(), - "include_start_time_state": False, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - assert response == { - "event": { - "end_time": end_time.timestamp(), - "start_time": start_time.timestamp(), - "states": {}, - }, - "id": 1, - "type": "event", - } - - -async def test_overflow_queue(recorder_mock, hass, hass_ws_client): - """Test overflowing the history stream queue.""" - now = dt_util.utcnow() - wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] - with patch.object(history, "MAX_PENDING_HISTORY_STATES", 5): - await async_setup_component( - hass, - "history", - {history.DOMAIN: {}}, - ) - await async_setup_component(hass, "sensor", {}) - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) - sensor_one_last_updated = hass.states.get("sensor.one").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) - sensor_two_last_updated = hass.states.get("sensor.two").last_updated - await async_recorder_block_till_done(hass) - hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) - await async_wait_recording_done(hass) - - await async_wait_recording_done(hass) - - client = await hass_ws_client() - init_listeners = hass.bus.async_listeners() - - await client.send_json( - { - "id": 1, - "type": "history/stream", - "entity_ids": wanted_entities, - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, - } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated.timestamp() - - assert response == { - "event": { - "end_time": first_end_time, - "start_time": now.timestamp(), - "states": { - "sensor.one": [ - {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} - ], - "sensor.two": [ - {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - # Overflow the queue - for val in range(10): - hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"}) - hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - - assert listeners_without_writes( - hass.bus.async_listeners() - ) == listeners_without_writes(init_listeners) diff --git a/tests/components/history/test_websocket_api.py b/tests/components/history/test_websocket_api.py new file mode 100644 index 00000000000..e481abe0f3e --- /dev/null +++ b/tests/components/history/test_websocket_api.py @@ -0,0 +1,1617 @@ +"""The tests the History component websocket_api.""" +# pylint: disable=protected-access,invalid-name +from datetime import timedelta +from unittest.mock import patch + +import async_timeout +from freezegun import freeze_time +import pytest + +from homeassistant.components import history +from homeassistant.components.history import websocket_api +from homeassistant.const import ( + CONF_DOMAINS, + CONF_ENTITIES, + CONF_INCLUDE, + EVENT_HOMEASSISTANT_FINAL_WRITE, +) +from homeassistant.setup import async_setup_component +import homeassistant.util.dt as dt_util + +from tests.common import async_fire_time_changed +from tests.components.recorder.common import ( + async_recorder_block_till_done, + async_wait_recording_done, +) + + +def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]: + """Return listeners without final write listeners since we are not testing for these.""" + return { + key: value + for key, value in listeners.items() + if key != EVENT_HOMEASSISTANT_FINAL_WRITE + } + + +@pytest.mark.usefixtures("hass_history") +def test_setup(): + """Test setup method of history.""" + # Verification occurs in the fixture + + +async def test_history_during_period(recorder_mock, hass, hass_ws_client): + """Test history_during_period.""" + now = dt_util.utcnow() + + await async_setup_component(hass, "history", {}) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "changed"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "again"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "end_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == {} + + await client.send_json( + { + "id": 2, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + + sensor_test_history = response["result"]["sensor.test"] + assert len(sensor_test_history) == 3 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert "a" not in sensor_test_history[1] + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + + assert sensor_test_history[2]["s"] == "on" + assert "a" not in sensor_test_history[2] + + await client.send_json( + { + "id": 3, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + sensor_test_history = response["result"]["sensor.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"any": "attr"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"any": "attr"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"any": "attr"} + + await client.send_json( + { + "id": 4, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 4 + sensor_test_history = response["result"]["sensor.test"] + + assert len(sensor_test_history) == 3 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"any": "attr"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"any": "attr"} + + assert sensor_test_history[2]["s"] == "on" + assert sensor_test_history[2]["a"] == {"any": "attr"} + + +async def test_history_during_period_impossible_conditions( + recorder_mock, hass, hass_ws_client +): + """Test history_during_period returns when condition cannot be true.""" + await async_setup_component(hass, "history", {}) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "changed"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "off", attributes={"any": "again"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.test", "on", attributes={"any": "attr"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + after = dt_util.utcnow() + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": after.isoformat(), + "end_time": after.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": False, + "significant_changes_only": False, + "no_attributes": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["result"] == {} + + future = dt_util.utcnow() + timedelta(hours=10) + + await client.send_json( + { + "id": 2, + "type": "history/history_during_period", + "start_time": future.isoformat(), + "entity_ids": ["sensor.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + assert response["result"] == {} + + +@pytest.mark.parametrize( + "time_zone", ["UTC", "Europe/Berlin", "America/Chicago", "US/Hawaii"] +) +async def test_history_during_period_significant_domain( + time_zone, recorder_mock, hass, hass_ws_client +): + """Test history_during_period with climate domain.""" + hass.config.set_time_zone(time_zone) + now = dt_util.utcnow() + + await async_setup_component(hass, "history", {}) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "on", attributes={"temperature": "1"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "2"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "3"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "4"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "on", attributes={"temperature": "5"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "end_time": now.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == {} + + await client.send_json( + { + "id": 2, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + + sensor_test_history = response["result"]["climate.test"] + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert "a" in sensor_test_history[1] + assert sensor_test_history[1]["s"] == "off" + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {} + + await client.send_json( + { + "id": 3, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + sensor_test_history = response["result"]["climate.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "1"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"temperature": "2"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"temperature": "5"} + + await client.send_json( + { + "id": 4, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 4 + sensor_test_history = response["result"]["climate.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "1"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"temperature": "2"} + + assert sensor_test_history[2]["s"] == "off" + assert sensor_test_history[2]["a"] == {"temperature": "3"} + + assert sensor_test_history[3]["s"] == "off" + assert sensor_test_history[3]["a"] == {"temperature": "4"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"temperature": "5"} + + # Test we impute the state time state + later = dt_util.utcnow() + await client.send_json( + { + "id": 5, + "type": "history/history_during_period", + "start_time": later.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 5 + sensor_test_history = response["result"]["climate.test"] + + assert len(sensor_test_history) == 1 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "5"} + assert sensor_test_history[0]["lu"] == later.timestamp() + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + +async def test_history_during_period_bad_start_time( + recorder_mock, hass, hass_ws_client +): + """Test history_during_period bad state time.""" + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": "cats", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_start_time" + + +async def test_history_during_period_bad_end_time(recorder_mock, hass, hass_ws_client): + """Test history_during_period bad end time.""" + now = dt_util.utcnow() + + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "end_time": "dogs", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + +async def test_history_during_period_with_use_include_order( + recorder_mock, hass, hass_ws_client +): + """Test history_during_period.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.three", "off", attributes={"any": "changed"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.four", "off", attributes={"any": "again"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/history_during_period", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + + assert list(response["result"]) == [ + *sort_order, + "sensor.three", + ] + + +async def test_history_stream_historical_only(recorder_mock, hass, hass_ws_client): + """Test history stream.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.three", "off", attributes={"any": "changed"}) + sensor_three_last_updated = hass.states.get("sensor.three").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.four", "off", attributes={"any": "again"}) + sensor_four_last_updated = hass.states.get("sensor.four").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + end_time = dt_util.utcnow() + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + + assert response == { + "event": { + "end_time": sensor_four_last_updated.timestamp(), + "start_time": now.timestamp(), + "states": { + "sensor.four": [ + {"a": {}, "lu": sensor_four_last_updated.timestamp(), "s": "off"} + ], + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.three": [ + {"a": {}, "lu": sensor_three_last_updated.timestamp(), "s": "off"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_significant_domain_historical_only( + recorder_mock, hass, hass_ws_client +): + """Test the stream with climate domain with historical states only.""" + now = dt_util.utcnow() + + await async_setup_component(hass, "history", {}) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "on", attributes={"temperature": "1"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "2"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "3"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "off", attributes={"temperature": "4"}) + await async_recorder_block_till_done(hass) + hass.states.async_set("climate.test", "on", attributes={"temperature": "5"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": now.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response == { + "event": { + "end_time": now.timestamp(), + "start_time": now.timestamp(), + "states": {}, + }, + "id": 1, + "type": "event", + } + + end_time = dt_util.utcnow() + await client.send_json( + { + "id": 2, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert "a" in sensor_test_history[1] + assert sensor_test_history[1]["s"] == "off" + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {} + + await client.send_json( + { + "id": 3, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "1"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"temperature": "2"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"temperature": "5"} + + await client.send_json( + { + "id": 4, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": end_time.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 4 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + + assert len(sensor_test_history) == 5 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "1"} + assert isinstance(sensor_test_history[0]["lu"], float) + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + assert sensor_test_history[1]["s"] == "off" + assert isinstance(sensor_test_history[1]["lu"], float) + assert "lc" not in sensor_test_history[1] # skipped if the same a last_updated (lu) + assert sensor_test_history[1]["a"] == {"temperature": "2"} + + assert sensor_test_history[2]["s"] == "off" + assert sensor_test_history[2]["a"] == {"temperature": "3"} + + assert sensor_test_history[3]["s"] == "off" + assert sensor_test_history[3]["a"] == {"temperature": "4"} + + assert sensor_test_history[4]["s"] == "on" + assert sensor_test_history[4]["a"] == {"temperature": "5"} + + # Test we impute the state time state + later = dt_util.utcnow() + await client.send_json( + { + "id": 5, + "type": "history/stream", + "start_time": later.isoformat(), + "end_time": later.isoformat(), + "entity_ids": ["climate.test"], + "include_start_time_state": True, + "significant_changes_only": True, + "no_attributes": False, + } + ) + async with async_timeout.timeout(3): + response = await client.receive_json() + assert response["success"] + assert response["id"] == 5 + assert response["type"] == "result" + + async with async_timeout.timeout(3): + response = await client.receive_json() + sensor_test_history = response["event"]["states"]["climate.test"] + + assert len(sensor_test_history) == 1 + + assert sensor_test_history[0]["s"] == "on" + assert sensor_test_history[0]["a"] == {"temperature": "5"} + assert sensor_test_history[0]["lu"] == later.timestamp() + assert "lc" not in sensor_test_history[0] # skipped if the same a last_updated (lu) + + +async def test_history_stream_bad_start_time(recorder_mock, hass, hass_ws_client): + """Test history stream bad state time.""" + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": "cats", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_start_time" + + +async def test_history_stream_end_time_before_start_time( + recorder_mock, hass, hass_ws_client +): + """Test history stream with an end_time before the start_time.""" + end_time = dt_util.utcnow() - timedelta(seconds=2) + start_time = dt_util.utcnow() - timedelta(seconds=1) + + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + +async def test_history_stream_bad_end_time(recorder_mock, hass, hass_ws_client): + """Test history stream bad end time.""" + now = dt_util.utcnow() + + await async_setup_component( + hass, + "history", + {"history": {}}, + ) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "end_time": "dogs", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + +async def test_history_stream_live_no_attributes_minimal_response( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data and no_attributes and minimal_response.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live(recorder_mock, hass, hass_ws_client): + """Test history stream with history and live data.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + "minimal_response": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + { + "a": {"any": "attr"}, + "lu": sensor_one_last_updated.timestamp(), + "s": "on", + } + ], + "sensor.two": [ + { + "a": {"any": "attr"}, + "lu": sensor_two_last_updated.timestamp(), + "s": "off", + } + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"diff": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_one_last_changed = hass.states.get("sensor.one").last_changed + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [ + { + "lc": sensor_one_last_changed.timestamp(), + "lu": sensor_one_last_updated.timestamp(), + "s": "on", + "a": {"diff": "attr"}, + } + ], + "sensor.two": [ + { + "lu": sensor_two_last_updated.timestamp(), + "s": "two", + "a": {"any": "attr"}, + } + ], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_minimal_response( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data and minimal_response.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": False, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + { + "a": {"any": "attr"}, + "lu": sensor_one_last_updated.timestamp(), + "s": "on", + } + ], + "sensor.two": [ + { + "a": {"any": "attr"}, + "lu": sensor_two_last_updated.timestamp(), + "s": "off", + } + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"diff": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + # Only sensor.two has changed + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + hass.states.async_remove("sensor.one") + hass.states.async_remove("sensor.two") + await async_recorder_block_till_done(hass) + + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.two": [ + { + "lu": sensor_two_last_updated.timestamp(), + "s": "two", + "a": {"any": "attr"}, + } + ], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_no_attributes(recorder_mock, hass, hass_ws_client): + """Test history stream with history and live data and no_attributes.""" + now = dt_util.utcnow() + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": False, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"diff": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"diff": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_no_attributes_minimal_response_specific_entities( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data and no_attributes and minimal_response with specific entities.""" + now = dt_util.utcnow() + wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + {history.DOMAIN: {}}, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_live_with_future_end_time( + recorder_mock, hass, hass_ws_client +): + """Test history stream with history and live data with future end time.""" + now = dt_util.utcnow() + wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + {history.DOMAIN: {}}, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + future = now + timedelta(seconds=10) + + client = await hass_ws_client() + init_listeners = hass.bus.async_listeners() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "end_time": future.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "one", attributes={"any": "attr"}) + hass.states.async_set("sensor.two", "two", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + response = await client.receive_json() + assert response == { + "event": { + "states": { + "sensor.one": [{"lu": sensor_one_last_updated.timestamp(), "s": "one"}], + "sensor.two": [{"lu": sensor_two_last_updated.timestamp(), "s": "two"}], + }, + }, + "id": 1, + "type": "event", + } + + async_fire_time_changed(hass, future + timedelta(seconds=1)) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "future", attributes={"any": "attr"}) + # Check our listener got unsubscribed + await async_wait_recording_done(hass) + await async_recorder_block_till_done(hass) + assert listeners_without_writes( + hass.bus.async_listeners() + ) == listeners_without_writes(init_listeners) + + +@pytest.mark.parametrize("include_start_time_state", (True, False)) +async def test_history_stream_before_history_starts( + recorder_mock, hass, hass_ws_client, include_start_time_state +): + """Test history stream before we have history.""" + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + await async_wait_recording_done(hass) + far_past = dt_util.utcnow() - timedelta(days=1000) + far_past_end = far_past + timedelta(seconds=10) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": ["sensor.one"], + "start_time": far_past.isoformat(), + "end_time": far_past_end.isoformat(), + "include_start_time_state": include_start_time_state, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + assert response == { + "event": { + "end_time": far_past_end.timestamp(), + "start_time": far_past.timestamp(), + "states": {}, + }, + "id": 1, + "type": "event", + } + + +async def test_history_stream_for_entity_with_no_possible_changes( + recorder_mock, hass, hass_ws_client +): + """Test history stream for future with no possible changes where end time is less than or equal to now.""" + sort_order = ["sensor.two", "sensor.four", "sensor.one"] + await async_setup_component( + hass, + "history", + { + history.DOMAIN: { + history.CONF_ORDER: True, + CONF_INCLUDE: { + CONF_ENTITIES: sort_order, + CONF_DOMAINS: ["sensor"], + }, + } + }, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + await async_wait_recording_done(hass) + + last_updated = hass.states.get("sensor.one").last_updated + start_time = last_updated + timedelta(seconds=10) + end_time = start_time + timedelta(seconds=10) + + with freeze_time(end_time): + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": ["sensor.one"], + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "include_start_time_state": False, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + assert response == { + "event": { + "end_time": end_time.timestamp(), + "start_time": start_time.timestamp(), + "states": {}, + }, + "id": 1, + "type": "event", + } + + +async def test_overflow_queue(recorder_mock, hass, hass_ws_client): + """Test overflowing the history stream queue.""" + now = dt_util.utcnow() + wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] + with patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5): + await async_setup_component( + hass, + "history", + {history.DOMAIN: {}}, + ) + await async_setup_component(hass, "sensor", {}) + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.one", "on", attributes={"any": "attr"}) + sensor_one_last_updated = hass.states.get("sensor.one").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("sensor.two", "off", attributes={"any": "attr"}) + sensor_two_last_updated = hass.states.get("sensor.two").last_updated + await async_recorder_block_till_done(hass) + hass.states.async_set("switch.excluded", "off", attributes={"any": "again"}) + await async_wait_recording_done(hass) + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + init_listeners = hass.bus.async_listeners() + + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated.timestamp() + + assert response == { + "event": { + "end_time": first_end_time, + "start_time": now.timestamp(), + "states": { + "sensor.one": [ + {"a": {}, "lu": sensor_one_last_updated.timestamp(), "s": "on"} + ], + "sensor.two": [ + {"a": {}, "lu": sensor_two_last_updated.timestamp(), "s": "off"} + ], + }, + }, + "id": 1, + "type": "event", + } + + await async_recorder_block_till_done(hass) + # Overflow the queue + for val in range(10): + hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"}) + hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"}) + await async_recorder_block_till_done(hass) + + assert listeners_without_writes( + hass.bus.async_listeners() + ) == listeners_without_writes(init_listeners)