From 9c3f9491651f409e8b4d0d645115b55b14f06165 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 22 May 2022 14:57:54 -0500 Subject: [PATCH] Add live streaming logbook websocket endpoint (#72258) Co-authored-by: Paulus Schoutsen --- homeassistant/components/logbook/__init__.py | 732 +---------- homeassistant/components/logbook/const.py | 39 + homeassistant/components/logbook/helpers.py | 193 +++ homeassistant/components/logbook/models.py | 113 ++ homeassistant/components/logbook/processor.py | 488 +++++++ homeassistant/components/logbook/rest_api.py | 120 ++ .../components/logbook/websocket_api.py | 319 +++++ homeassistant/components/recorder/core.py | 7 + homeassistant/components/recorder/tasks.py | 14 + .../components/websocket_api/messages.py | 2 +- homeassistant/core.py | 5 +- tests/components/logbook/common.py | 21 +- tests/components/logbook/test_init.py | 22 +- .../components/logbook/test_websocket_api.py | 1166 +++++++++++++++++ tests/components/recorder/common.py | 31 +- tests/components/recorder/test_init.py | 23 + tests/test_core.py | 30 + 17 files changed, 2592 insertions(+), 733 deletions(-) create mode 100644 homeassistant/components/logbook/const.py create mode 100644 homeassistant/components/logbook/helpers.py create mode 100644 homeassistant/components/logbook/models.py create mode 100644 homeassistant/components/logbook/processor.py create mode 100644 homeassistant/components/logbook/rest_api.py create mode 100644 homeassistant/components/logbook/websocket_api.py create mode 100644 tests/components/logbook/test_websocket_api.py diff --git a/homeassistant/components/logbook/__init__.py b/homeassistant/components/logbook/__init__.py index 635868310f6..d9893781748 100644 --- a/homeassistant/components/logbook/__init__.py +++ b/homeassistant/components/logbook/__init__.py @@ -1,63 +1,25 @@ """Event parser and human readable log generator.""" from __future__ import annotations -from collections.abc import Callable, Generator -from contextlib import suppress -from datetime import datetime as dt, timedelta -from http import HTTPStatus -import json -import logging -import re -from typing import Any, cast +from collections.abc import Callable +from typing import Any -from aiohttp import web -from sqlalchemy.engine.row import Row -from sqlalchemy.orm.query import Query import voluptuous as vol -from homeassistant.components import frontend, websocket_api -from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED -from homeassistant.components.http import HomeAssistantView -from homeassistant.components.recorder import get_instance +from homeassistant.components import frontend from homeassistant.components.recorder.filters import ( - Filters, sqlalchemy_filter_from_include_exclude_conf, ) -from homeassistant.components.recorder.models import ( - process_datetime_to_timestamp, - process_timestamp_to_utc_isoformat, -) -from homeassistant.components.recorder.util import session_scope -from homeassistant.components.script import EVENT_SCRIPT_STARTED -from homeassistant.components.sensor import ATTR_STATE_CLASS, DOMAIN as SENSOR_DOMAIN -from homeassistant.components.websocket_api import messages -from homeassistant.components.websocket_api.const import JSON_DUMP from homeassistant.const import ( ATTR_DOMAIN, ATTR_ENTITY_ID, - ATTR_FRIENDLY_NAME, ATTR_NAME, - ATTR_SERVICE, - EVENT_CALL_SERVICE, EVENT_LOGBOOK_ENTRY, ) -from homeassistant.core import ( - Context, - Event, - HomeAssistant, - ServiceCall, - callback, - split_entity_id, -) -from homeassistant.exceptions import InvalidEntityFormatError -from homeassistant.helpers import ( - config_validation as cv, - device_registry as dr, - entity_registry as er, -) +from homeassistant.core import Context, Event, HomeAssistant, ServiceCall, callback +from homeassistant.helpers import config_validation as cv from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, - EntityFilter, convert_include_exclude_filter, ) from homeassistant.helpers.integration_platform import ( @@ -65,47 +27,25 @@ from homeassistant.helpers.integration_platform import ( ) from homeassistant.helpers.typing import ConfigType from homeassistant.loader import bind_hass -import homeassistant.util.dt as dt_util -from .queries import statement_for_request -from .queries.common import PSUEDO_EVENT_STATE_CHANGED - -_LOGGER = logging.getLogger(__name__) - -ENTITY_ID_JSON_EXTRACT = re.compile('"entity_id": ?"([^"]+)"') -DOMAIN_JSON_EXTRACT = re.compile('"domain": ?"([^"]+)"') -ATTR_MESSAGE = "message" - -DOMAIN = "logbook" +from . import rest_api, websocket_api +from .const import ( + ATTR_MESSAGE, + DOMAIN, + LOGBOOK_ENTITIES_FILTER, + LOGBOOK_ENTRY_DOMAIN, + LOGBOOK_ENTRY_ENTITY_ID, + LOGBOOK_ENTRY_MESSAGE, + LOGBOOK_ENTRY_NAME, + LOGBOOK_FILTERS, +) +from .const import LOGBOOK_ENTRY_ICON # noqa: F401 +from .models import LazyEventPartialState # noqa: F401 CONFIG_SCHEMA = vol.Schema( {DOMAIN: INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA}, extra=vol.ALLOW_EXTRA ) -CONTEXT_USER_ID = "context_user_id" -CONTEXT_ENTITY_ID = "context_entity_id" -CONTEXT_ENTITY_ID_NAME = "context_entity_id_name" -CONTEXT_EVENT_TYPE = "context_event_type" -CONTEXT_DOMAIN = "context_domain" -CONTEXT_STATE = "context_state" -CONTEXT_SERVICE = "context_service" -CONTEXT_NAME = "context_name" -CONTEXT_MESSAGE = "context_message" - -LOGBOOK_ENTRY_DOMAIN = "domain" -LOGBOOK_ENTRY_ENTITY_ID = "entity_id" -LOGBOOK_ENTRY_ICON = "icon" -LOGBOOK_ENTRY_MESSAGE = "message" -LOGBOOK_ENTRY_NAME = "name" -LOGBOOK_ENTRY_STATE = "state" -LOGBOOK_ENTRY_WHEN = "when" - -ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED = {EVENT_LOGBOOK_ENTRY, EVENT_CALL_SERVICE} -ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY = { - EVENT_LOGBOOK_ENTRY, - EVENT_AUTOMATION_TRIGGERED, - EVENT_SCRIPT_STARTED, -} LOG_MESSAGE_SCHEMA = vol.Schema( { @@ -117,10 +57,6 @@ LOG_MESSAGE_SCHEMA = vol.Schema( ) -LOGBOOK_FILTERS = "logbook_filters" -LOGBOOK_ENTITIES_FILTER = "entities_filter" - - @bind_hass def log_entry( hass: HomeAssistant, @@ -186,13 +122,10 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: else: filters = None entities_filter = None - hass.data[LOGBOOK_FILTERS] = filters hass.data[LOGBOOK_ENTITIES_FILTER] = entities_filter - - hass.http.register_view(LogbookView(conf, filters, entities_filter)) - websocket_api.async_register_command(hass, ws_get_events) - + websocket_api.async_setup(hass) + rest_api.async_setup(hass, config, filters, entities_filter) hass.services.async_register(DOMAIN, "log", log_message, schema=LOG_MESSAGE_SCHEMA) await async_process_integration_platforms(hass, DOMAIN, _process_logbook_platform) @@ -215,628 +148,3 @@ async def _process_logbook_platform( hass.data[DOMAIN][event_name] = (domain, describe_callback) platform.async_describe_events(hass, _async_describe_event) - - -def _async_determine_event_types( - hass: HomeAssistant, entity_ids: list[str] | None, device_ids: list[str] | None -) -> tuple[str, ...]: - """Reduce the event types based on the entity ids and device ids.""" - external_events: dict[ - str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] - ] = hass.data.get(DOMAIN, {}) - if not entity_ids and not device_ids: - return (*ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED, *external_events) - config_entry_ids: set[str] = set() - intrested_event_types: set[str] = set() - - if entity_ids: - # - # Home Assistant doesn't allow firing events from - # entities so we have a limited list to check - # - # automations and scripts can refer to entities - # but they do not have a config entry so we need - # to add them. - # - # We also allow entity_ids to be recorded via - # manual logbook entries. - # - intrested_event_types |= ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY - - if device_ids: - dev_reg = dr.async_get(hass) - for device_id in device_ids: - if (device := dev_reg.async_get(device_id)) and device.config_entries: - config_entry_ids |= device.config_entries - interested_domains: set[str] = set() - for entry_id in config_entry_ids: - if entry := hass.config_entries.async_get_entry(entry_id): - interested_domains.add(entry.domain) - for external_event, domain_call in external_events.items(): - if domain_call[0] in interested_domains: - intrested_event_types.add(external_event) - - return tuple( - event_type - for event_type in (EVENT_LOGBOOK_ENTRY, *external_events) - if event_type in intrested_event_types - ) - - -def _ws_formatted_get_events( - hass: HomeAssistant, - msg_id: int, - start_day: dt, - end_day: dt, - event_types: tuple[str, ...], - ent_reg: er.EntityRegistry, - entity_ids: list[str] | None = None, - device_ids: list[str] | None = None, - filters: Filters | None = None, - entities_filter: EntityFilter | Callable[[str], bool] | None = None, - context_id: str | None = None, -) -> str: - """Fetch events and convert them to json in the executor.""" - return JSON_DUMP( - messages.result_message( - msg_id, - _get_events( - hass, - start_day, - end_day, - event_types, - ent_reg, - entity_ids, - device_ids, - filters, - entities_filter, - context_id, - True, - False, - ), - ) - ) - - -@websocket_api.websocket_command( - { - vol.Required("type"): "logbook/get_events", - vol.Required("start_time"): str, - vol.Optional("end_time"): str, - vol.Optional("entity_ids"): [str], - vol.Optional("device_ids"): [str], - vol.Optional("context_id"): str, - } -) -@websocket_api.async_response -async def ws_get_events( - hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict -) -> None: - """Handle logbook get events websocket command.""" - start_time_str = msg["start_time"] - end_time_str = msg.get("end_time") - utc_now = dt_util.utcnow() - - if start_time := dt_util.parse_datetime(start_time_str): - start_time = dt_util.as_utc(start_time) - else: - connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time") - return - - if not end_time_str: - end_time = utc_now - elif parsed_end_time := dt_util.parse_datetime(end_time_str): - end_time = dt_util.as_utc(parsed_end_time) - else: - connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time") - return - - if start_time > utc_now: - connection.send_result(msg["id"], []) - return - - device_ids = msg.get("device_ids") - entity_ids = msg.get("entity_ids") - context_id = msg.get("context_id") - event_types = _async_determine_event_types(hass, entity_ids, device_ids) - ent_reg = er.async_get(hass) - - connection.send_message( - await get_instance(hass).async_add_executor_job( - _ws_formatted_get_events, - hass, - msg["id"], - start_time, - end_time, - event_types, - ent_reg, - entity_ids, - device_ids, - hass.data[LOGBOOK_FILTERS], - hass.data[LOGBOOK_ENTITIES_FILTER], - context_id, - ) - ) - - -class LogbookView(HomeAssistantView): - """Handle logbook view requests.""" - - url = "/api/logbook" - name = "api:logbook" - extra_urls = ["/api/logbook/{datetime}"] - - def __init__( - self, - config: dict[str, Any], - filters: Filters | None, - entities_filter: EntityFilter | None, - ) -> None: - """Initialize the logbook view.""" - self.config = config - self.filters = filters - self.entities_filter = entities_filter - - async def get( - self, request: web.Request, datetime: str | None = None - ) -> web.Response: - """Retrieve logbook entries.""" - if datetime: - if (datetime_dt := dt_util.parse_datetime(datetime)) is None: - return self.json_message("Invalid datetime", HTTPStatus.BAD_REQUEST) - else: - datetime_dt = dt_util.start_of_local_day() - - if (period_str := request.query.get("period")) is None: - period: int = 1 - else: - period = int(period_str) - - if entity_ids_str := request.query.get("entity"): - try: - entity_ids = cv.entity_ids(entity_ids_str) - except vol.Invalid: - raise InvalidEntityFormatError( - f"Invalid entity id(s) encountered: {entity_ids_str}. " - "Format should be ." - ) from vol.Invalid - else: - entity_ids = None - - if (end_time_str := request.query.get("end_time")) is None: - start_day = dt_util.as_utc(datetime_dt) - timedelta(days=period - 1) - end_day = start_day + timedelta(days=period) - else: - start_day = datetime_dt - if (end_day_dt := dt_util.parse_datetime(end_time_str)) is None: - return self.json_message("Invalid end_time", HTTPStatus.BAD_REQUEST) - end_day = end_day_dt - - hass = request.app["hass"] - - context_id = request.query.get("context_id") - - if entity_ids and context_id: - return self.json_message( - "Can't combine entity with context_id", HTTPStatus.BAD_REQUEST - ) - - event_types = _async_determine_event_types(hass, entity_ids, None) - ent_reg = er.async_get(hass) - - def json_events() -> web.Response: - """Fetch events and generate JSON.""" - return self.json( - _get_events( - hass, - start_day, - end_day, - event_types, - ent_reg, - entity_ids, - None, - self.filters, - self.entities_filter, - context_id, - False, - True, - ) - ) - - return cast( - web.Response, await get_instance(hass).async_add_executor_job(json_events) - ) - - -def _humanify( - rows: Generator[Row, None, None], - entities_filter: EntityFilter | Callable[[str], bool] | None, - ent_reg: er.EntityRegistry, - external_events: dict[ - str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] - ], - entity_name_cache: EntityNameCache, - format_time: Callable[[Row], Any], - include_entity_name: bool = True, -) -> Generator[dict[str, Any], None, None]: - """Generate a converted list of events into entries.""" - # Continuous sensors, will be excluded from the logbook - continuous_sensors: dict[str, bool] = {} - event_data_cache: dict[str, dict[str, Any]] = {} - context_lookup: dict[str | None, Row | None] = {None: None} - event_cache = EventCache(event_data_cache) - context_augmenter = ContextAugmenter( - context_lookup, - entity_name_cache, - external_events, - event_cache, - include_entity_name, - ) - - def _keep_row(row: Row, event_type: str) -> bool: - """Check if the entity_filter rejects a row.""" - assert entities_filter is not None - if entity_id := _row_event_data_extract(row, ENTITY_ID_JSON_EXTRACT): - return entities_filter(entity_id) - - if event_type in external_events: - # If the entity_id isn't described, use the domain that describes - # the event for filtering. - domain: str | None = external_events[event_type][0] - else: - domain = _row_event_data_extract(row, DOMAIN_JSON_EXTRACT) - - return domain is not None and entities_filter(f"{domain}._") - - # Process rows - for row in rows: - context_id = row.context_id - context_lookup.setdefault(context_id, row) - if row.context_only: - continue - event_type = row.event_type - if event_type == EVENT_CALL_SERVICE or ( - event_type is not PSUEDO_EVENT_STATE_CHANGED - and entities_filter is not None - and not _keep_row(row, event_type) - ): - continue - - if event_type is PSUEDO_EVENT_STATE_CHANGED: - entity_id = row.entity_id - assert entity_id is not None - # Skip continuous sensors - if ( - is_continuous := continuous_sensors.get(entity_id) - ) is None and split_entity_id(entity_id)[0] == SENSOR_DOMAIN: - is_continuous = _is_sensor_continuous(ent_reg, entity_id) - continuous_sensors[entity_id] = is_continuous - if is_continuous: - continue - - data = { - LOGBOOK_ENTRY_WHEN: format_time(row), - LOGBOOK_ENTRY_STATE: row.state, - LOGBOOK_ENTRY_ENTITY_ID: entity_id, - } - if include_entity_name: - data[LOGBOOK_ENTRY_NAME] = entity_name_cache.get(entity_id, row) - if icon := row.icon or row.old_format_icon: - data[LOGBOOK_ENTRY_ICON] = icon - - context_augmenter.augment(data, row, context_id) - yield data - - elif event_type in external_events: - domain, describe_event = external_events[event_type] - data = describe_event(event_cache.get(row)) - data[LOGBOOK_ENTRY_WHEN] = format_time(row) - data[LOGBOOK_ENTRY_DOMAIN] = domain - context_augmenter.augment(data, row, context_id) - yield data - - elif event_type == EVENT_LOGBOOK_ENTRY: - event = event_cache.get(row) - if not (event_data := event.data): - continue - entry_domain = event_data.get(ATTR_DOMAIN) - entry_entity_id = event_data.get(ATTR_ENTITY_ID) - if entry_domain is None and entry_entity_id is not None: - with suppress(IndexError): - entry_domain = split_entity_id(str(entry_entity_id))[0] - - data = { - LOGBOOK_ENTRY_WHEN: format_time(row), - LOGBOOK_ENTRY_NAME: event_data.get(ATTR_NAME), - LOGBOOK_ENTRY_MESSAGE: event_data.get(ATTR_MESSAGE), - LOGBOOK_ENTRY_DOMAIN: entry_domain, - LOGBOOK_ENTRY_ENTITY_ID: entry_entity_id, - } - context_augmenter.augment(data, row, context_id) - yield data - - -def _get_events( - hass: HomeAssistant, - start_day: dt, - end_day: dt, - event_types: tuple[str, ...], - ent_reg: er.EntityRegistry, - entity_ids: list[str] | None = None, - device_ids: list[str] | None = None, - filters: Filters | None = None, - entities_filter: EntityFilter | Callable[[str], bool] | None = None, - context_id: str | None = None, - timestamp: bool = False, - include_entity_name: bool = True, -) -> list[dict[str, Any]]: - """Get events for a period of time.""" - assert not ( - context_id and (entity_ids or device_ids) - ), "can't pass in both context_id and (entity_ids or device_ids)" - external_events: dict[ - str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] - ] = hass.data.get(DOMAIN, {}) - format_time = _row_time_fired_timestamp if timestamp else _row_time_fired_isoformat - entity_name_cache = EntityNameCache(hass) - if entity_ids or device_ids: - entities_filter = None - - def yield_rows(query: Query) -> Generator[Row, None, None]: - """Yield rows from the database.""" - # end_day - start_day intentionally checks .days and not .total_seconds() - # since we don't want to switch over to buffered if they go - # over one day by a few hours since the UI makes it so easy to do that. - if entity_ids or context_id or (end_day - start_day).days <= 1: - return query.all() # type: ignore[no-any-return] - # Only buffer rows to reduce memory pressure - # if we expect the result set is going to be very large. - # What is considered very large is going to differ - # based on the hardware Home Assistant is running on. - # - # sqlalchemy suggests that is at least 10k, but for - # even and RPi3 that number seems higher in testing - # so we don't switch over until we request > 1 day+ of data. - # - return query.yield_per(1024) # type: ignore[no-any-return] - - stmt = statement_for_request( - start_day, - end_day, - event_types, - entity_ids, - device_ids, - filters, - context_id, - ) - if _LOGGER.isEnabledFor(logging.DEBUG): - _LOGGER.debug( - "Literal statement: %s", - stmt.compile(compile_kwargs={"literal_binds": True}), - ) - - with session_scope(hass=hass) as session: - return list( - _humanify( - yield_rows(session.execute(stmt)), - entities_filter, - ent_reg, - external_events, - entity_name_cache, - format_time, - include_entity_name, - ) - ) - - -class ContextAugmenter: - """Augment data with context trace.""" - - def __init__( - self, - context_lookup: dict[str | None, Row | None], - entity_name_cache: EntityNameCache, - external_events: dict[ - str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] - ], - event_cache: EventCache, - include_entity_name: bool, - ) -> None: - """Init the augmenter.""" - self.context_lookup = context_lookup - self.entity_name_cache = entity_name_cache - self.external_events = external_events - self.event_cache = event_cache - self.include_entity_name = include_entity_name - - def augment(self, data: dict[str, Any], row: Row, context_id: str) -> None: - """Augment data from the row and cache.""" - if context_user_id := row.context_user_id: - data[CONTEXT_USER_ID] = context_user_id - - if not (context_row := self.context_lookup.get(context_id)): - return - - if _rows_match(row, context_row): - # This is the first event with the given ID. Was it directly caused by - # a parent event? - if ( - not row.context_parent_id - or (context_row := self.context_lookup.get(row.context_parent_id)) - is None - ): - return - # Ensure the (parent) context_event exists and is not the root cause of - # this log entry. - if _rows_match(row, context_row): - return - - event_type = context_row.event_type - - # State change - if context_entity_id := context_row.entity_id: - data[CONTEXT_STATE] = context_row.state - data[CONTEXT_ENTITY_ID] = context_entity_id - if self.include_entity_name: - data[CONTEXT_ENTITY_ID_NAME] = self.entity_name_cache.get( - context_entity_id, context_row - ) - return - - # Call service - if event_type == EVENT_CALL_SERVICE: - event = self.event_cache.get(context_row) - event_data = event.data - data[CONTEXT_DOMAIN] = event_data.get(ATTR_DOMAIN) - data[CONTEXT_SERVICE] = event_data.get(ATTR_SERVICE) - data[CONTEXT_EVENT_TYPE] = event_type - return - - if event_type not in self.external_events: - return - - domain, describe_event = self.external_events[event_type] - data[CONTEXT_EVENT_TYPE] = event_type - data[CONTEXT_DOMAIN] = domain - event = self.event_cache.get(context_row) - described = describe_event(event) - if name := described.get(ATTR_NAME): - data[CONTEXT_NAME] = name - if message := described.get(ATTR_MESSAGE): - data[CONTEXT_MESSAGE] = message - if not (attr_entity_id := described.get(ATTR_ENTITY_ID)): - return - data[CONTEXT_ENTITY_ID] = attr_entity_id - if self.include_entity_name: - data[CONTEXT_ENTITY_ID_NAME] = self.entity_name_cache.get( - attr_entity_id, context_row - ) - - -def _is_sensor_continuous(ent_reg: er.EntityRegistry, entity_id: str) -> bool: - """Determine if a sensor is continuous by checking its state class. - - Sensors with a unit_of_measurement are also considered continuous, but are filtered - already by the SQL query generated by _get_events - """ - if not (entry := ent_reg.async_get(entity_id)): - # Entity not registered, so can't have a state class - return False - return ( - entry.capabilities is not None - and entry.capabilities.get(ATTR_STATE_CLASS) is not None - ) - - -def _rows_match(row: Row, other_row: Row) -> bool: - """Check of rows match by using the same method as Events __hash__.""" - if ( - (state_id := row.state_id) is not None - and state_id == other_row.state_id - or (event_id := row.event_id) is not None - and event_id == other_row.event_id - ): - return True - return False - - -def _row_event_data_extract(row: Row, extractor: re.Pattern) -> str | None: - """Extract from event_data row.""" - result = extractor.search(row.shared_data or row.event_data or "") - return result.group(1) if result else None - - -def _row_time_fired_isoformat(row: Row) -> str: - """Convert the row timed_fired to isoformat.""" - return process_timestamp_to_utc_isoformat(row.time_fired or dt_util.utcnow()) - - -def _row_time_fired_timestamp(row: Row) -> float: - """Convert the row timed_fired to timestamp.""" - return process_datetime_to_timestamp(row.time_fired or dt_util.utcnow()) - - -class LazyEventPartialState: - """A lazy version of core Event with limited State joined in.""" - - __slots__ = [ - "row", - "_event_data", - "_event_data_cache", - "event_type", - "entity_id", - "state", - "context_id", - "context_user_id", - "context_parent_id", - "data", - ] - - def __init__( - self, - row: Row, - event_data_cache: dict[str, dict[str, Any]], - ) -> None: - """Init the lazy event.""" - self.row = row - self._event_data: dict[str, Any] | None = None - self._event_data_cache = event_data_cache - self.event_type: str = self.row.event_type - self.entity_id: str | None = self.row.entity_id - self.state = self.row.state - self.context_id: str | None = self.row.context_id - self.context_user_id: str | None = self.row.context_user_id - self.context_parent_id: str | None = self.row.context_parent_id - source: str = self.row.shared_data or self.row.event_data - if not source: - self.data = {} - elif event_data := self._event_data_cache.get(source): - self.data = event_data - else: - self.data = self._event_data_cache[source] = cast( - dict[str, Any], json.loads(source) - ) - - -class EntityNameCache: - """A cache to lookup the name for an entity. - - This class should not be used to lookup attributes - that are expected to change state. - """ - - def __init__(self, hass: HomeAssistant) -> None: - """Init the cache.""" - self._hass = hass - self._names: dict[str, str] = {} - - def get(self, entity_id: str, row: Row) -> str: - """Lookup an the friendly name.""" - if entity_id in self._names: - return self._names[entity_id] - if (current_state := self._hass.states.get(entity_id)) and ( - friendly_name := current_state.attributes.get(ATTR_FRIENDLY_NAME) - ): - self._names[entity_id] = friendly_name - else: - return split_entity_id(entity_id)[1].replace("_", " ") - - return self._names[entity_id] - - -class EventCache: - """Cache LazyEventPartialState by row.""" - - def __init__(self, event_data_cache: dict[str, dict[str, Any]]) -> None: - """Init the cache.""" - self._event_data_cache = event_data_cache - self.event_cache: dict[Row, LazyEventPartialState] = {} - - def get(self, row: Row) -> LazyEventPartialState: - """Get the event from the row.""" - if event := self.event_cache.get(row): - return event - event = self.event_cache[row] = LazyEventPartialState( - row, self._event_data_cache - ) - return event diff --git a/homeassistant/components/logbook/const.py b/homeassistant/components/logbook/const.py new file mode 100644 index 00000000000..406406e9dd8 --- /dev/null +++ b/homeassistant/components/logbook/const.py @@ -0,0 +1,39 @@ +"""Event parser and human readable log generator.""" +from __future__ import annotations + +from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED +from homeassistant.components.script import EVENT_SCRIPT_STARTED +from homeassistant.const import EVENT_CALL_SERVICE, EVENT_LOGBOOK_ENTRY + +ATTR_MESSAGE = "message" + +DOMAIN = "logbook" + +CONTEXT_USER_ID = "context_user_id" +CONTEXT_ENTITY_ID = "context_entity_id" +CONTEXT_ENTITY_ID_NAME = "context_entity_id_name" +CONTEXT_EVENT_TYPE = "context_event_type" +CONTEXT_DOMAIN = "context_domain" +CONTEXT_STATE = "context_state" +CONTEXT_SERVICE = "context_service" +CONTEXT_NAME = "context_name" +CONTEXT_MESSAGE = "context_message" + +LOGBOOK_ENTRY_DOMAIN = "domain" +LOGBOOK_ENTRY_ENTITY_ID = "entity_id" +LOGBOOK_ENTRY_ICON = "icon" +LOGBOOK_ENTRY_MESSAGE = "message" +LOGBOOK_ENTRY_NAME = "name" +LOGBOOK_ENTRY_STATE = "state" +LOGBOOK_ENTRY_WHEN = "when" + +ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED = {EVENT_LOGBOOK_ENTRY, EVENT_CALL_SERVICE} +ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY = { + EVENT_LOGBOOK_ENTRY, + EVENT_AUTOMATION_TRIGGERED, + EVENT_SCRIPT_STARTED, +} + + +LOGBOOK_FILTERS = "logbook_filters" +LOGBOOK_ENTITIES_FILTER = "entities_filter" diff --git a/homeassistant/components/logbook/helpers.py b/homeassistant/components/logbook/helpers.py new file mode 100644 index 00000000000..cc9ea238f8b --- /dev/null +++ b/homeassistant/components/logbook/helpers.py @@ -0,0 +1,193 @@ +"""Event parser and human readable log generator.""" +from __future__ import annotations + +from collections.abc import Callable +from typing import Any + +from homeassistant.components.sensor import ATTR_STATE_CLASS +from homeassistant.const import ( + ATTR_DEVICE_ID, + ATTR_ENTITY_ID, + ATTR_UNIT_OF_MEASUREMENT, + EVENT_LOGBOOK_ENTRY, + EVENT_STATE_CHANGED, +) +from homeassistant.core import ( + CALLBACK_TYPE, + Event, + HomeAssistant, + State, + callback, + is_callback, +) +from homeassistant.helpers import device_registry as dr, entity_registry as er +from homeassistant.helpers.event import async_track_state_change_event + +from .const import ( + ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED, + DOMAIN, + ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY, +) +from .models import LazyEventPartialState + + +def async_filter_entities(hass: HomeAssistant, entity_ids: list[str]) -> list[str]: + """Filter out any entities that logbook will not produce results for.""" + ent_reg = er.async_get(hass) + return [ + entity_id + for entity_id in entity_ids + if not _is_entity_id_filtered(hass, ent_reg, entity_id) + ] + + +def async_determine_event_types( + hass: HomeAssistant, entity_ids: list[str] | None, device_ids: list[str] | None +) -> tuple[str, ...]: + """Reduce the event types based on the entity ids and device ids.""" + external_events: dict[ + str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] + ] = hass.data.get(DOMAIN, {}) + if not entity_ids and not device_ids: + return (*ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED, *external_events) + config_entry_ids: set[str] = set() + intrested_event_types: set[str] = set() + + if entity_ids: + # + # Home Assistant doesn't allow firing events from + # entities so we have a limited list to check + # + # automations and scripts can refer to entities + # but they do not have a config entry so we need + # to add them. + # + # We also allow entity_ids to be recorded via + # manual logbook entries. + # + intrested_event_types |= ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY + + if device_ids: + dev_reg = dr.async_get(hass) + for device_id in device_ids: + if (device := dev_reg.async_get(device_id)) and device.config_entries: + config_entry_ids |= device.config_entries + interested_domains: set[str] = set() + for entry_id in config_entry_ids: + if entry := hass.config_entries.async_get_entry(entry_id): + interested_domains.add(entry.domain) + for external_event, domain_call in external_events.items(): + if domain_call[0] in interested_domains: + intrested_event_types.add(external_event) + + return tuple( + event_type + for event_type in (EVENT_LOGBOOK_ENTRY, *external_events) + if event_type in intrested_event_types + ) + + +@callback +def async_subscribe_events( + hass: HomeAssistant, + subscriptions: list[CALLBACK_TYPE], + target: Callable[[Event], None], + event_types: tuple[str, ...], + entity_ids: list[str] | None, + device_ids: list[str] | None, +) -> None: + """Subscribe to events for the entities and devices or all. + + These are the events we need to listen for to do + the live logbook stream. + """ + ent_reg = er.async_get(hass) + assert is_callback(target), "target must be a callback" + event_forwarder = target + + if entity_ids or device_ids: + entity_ids_set = set(entity_ids) if entity_ids else set() + device_ids_set = set(device_ids) if device_ids else set() + + @callback + def _forward_events_filtered(event: Event) -> None: + event_data = event.data + if ( + entity_ids_set and event_data.get(ATTR_ENTITY_ID) in entity_ids_set + ) or (device_ids_set and event_data.get(ATTR_DEVICE_ID) in device_ids_set): + target(event) + + event_forwarder = _forward_events_filtered + + for event_type in event_types: + subscriptions.append( + hass.bus.async_listen(event_type, event_forwarder, run_immediately=True) + ) + + @callback + def _forward_state_events_filtered(event: Event) -> None: + if event.data.get("old_state") is None or event.data.get("new_state") is None: + return + state: State = event.data["new_state"] + if not _is_state_filtered(ent_reg, state): + target(event) + + if entity_ids: + subscriptions.append( + async_track_state_change_event( + hass, entity_ids, _forward_state_events_filtered + ) + ) + return + + # We want the firehose + subscriptions.append( + hass.bus.async_listen( + EVENT_STATE_CHANGED, + _forward_state_events_filtered, + run_immediately=True, + ) + ) + + +def is_sensor_continuous(ent_reg: er.EntityRegistry, entity_id: str) -> bool: + """Determine if a sensor is continuous by checking its state class. + + Sensors with a unit_of_measurement are also considered continuous, but are filtered + already by the SQL query generated by _get_events + """ + if not (entry := ent_reg.async_get(entity_id)): + # Entity not registered, so can't have a state class + return False + return ( + entry.capabilities is not None + and entry.capabilities.get(ATTR_STATE_CLASS) is not None + ) + + +def _is_state_filtered(ent_reg: er.EntityRegistry, state: State) -> bool: + """Check if the logbook should filter a state. + + Used when we are in live mode to ensure + we only get significant changes (state.last_changed != state.last_updated) + """ + return bool( + state.last_changed != state.last_updated + or ATTR_UNIT_OF_MEASUREMENT in state.attributes + or is_sensor_continuous(ent_reg, state.entity_id) + ) + + +def _is_entity_id_filtered( + hass: HomeAssistant, ent_reg: er.EntityRegistry, entity_id: str +) -> bool: + """Check if the logbook should filter an entity. + + Used to setup listeners and which entities to select + from the database when a list of entities is requested. + """ + return bool( + (state := hass.states.get(entity_id)) + and (ATTR_UNIT_OF_MEASUREMENT in state.attributes) + or is_sensor_continuous(ent_reg, entity_id) + ) diff --git a/homeassistant/components/logbook/models.py b/homeassistant/components/logbook/models.py new file mode 100644 index 00000000000..e4844751c6a --- /dev/null +++ b/homeassistant/components/logbook/models.py @@ -0,0 +1,113 @@ +"""Event parser and human readable log generator.""" +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime as dt +import json +from typing import Any, cast + +from sqlalchemy.engine.row import Row + +from homeassistant.const import ATTR_ICON, EVENT_STATE_CHANGED +from homeassistant.core import Context, Event, State, callback + + +class LazyEventPartialState: + """A lazy version of core Event with limited State joined in.""" + + __slots__ = [ + "row", + "_event_data", + "_event_data_cache", + "event_type", + "entity_id", + "state", + "context_id", + "context_user_id", + "context_parent_id", + "data", + ] + + def __init__( + self, + row: Row | EventAsRow, + event_data_cache: dict[str, dict[str, Any]], + ) -> None: + """Init the lazy event.""" + self.row = row + self._event_data: dict[str, Any] | None = None + self._event_data_cache = event_data_cache + self.event_type: str | None = self.row.event_type + self.entity_id: str | None = self.row.entity_id + self.state = self.row.state + self.context_id: str | None = self.row.context_id + self.context_user_id: str | None = self.row.context_user_id + self.context_parent_id: str | None = self.row.context_parent_id + if data := getattr(row, "data", None): + # If its an EventAsRow we can avoid the whole + # json decode process as we already have the data + self.data = data + return + source = cast(str, self.row.shared_data or self.row.event_data) + if not source: + self.data = {} + elif event_data := self._event_data_cache.get(source): + self.data = event_data + else: + self.data = self._event_data_cache[source] = cast( + dict[str, Any], json.loads(source) + ) + + +@dataclass +class EventAsRow: + """Convert an event to a row.""" + + data: dict[str, Any] + context: Context + context_id: str + time_fired: dt + state_id: int + event_data: str | None = None + old_format_icon: None = None + event_id: None = None + entity_id: str | None = None + icon: str | None = None + context_user_id: str | None = None + context_parent_id: str | None = None + event_type: str | None = None + state: str | None = None + shared_data: str | None = None + context_only: None = None + + +@callback +def async_event_to_row(event: Event) -> EventAsRow | None: + """Convert an event to a row.""" + if event.event_type != EVENT_STATE_CHANGED: + return EventAsRow( + data=event.data, + context=event.context, + event_type=event.event_type, + context_id=event.context.id, + context_user_id=event.context.user_id, + context_parent_id=event.context.parent_id, + time_fired=event.time_fired, + state_id=hash(event), + ) + # States are prefiltered so we never get states + # that are missing new_state or old_state + # since the logbook does not show these + new_state: State = event.data["new_state"] + return EventAsRow( + data=event.data, + context=event.context, + entity_id=new_state.entity_id, + state=new_state.state, + context_id=new_state.context.id, + context_user_id=new_state.context.user_id, + context_parent_id=new_state.context.parent_id, + time_fired=new_state.last_updated, + state_id=hash(event), + icon=new_state.attributes.get(ATTR_ICON), + ) diff --git a/homeassistant/components/logbook/processor.py b/homeassistant/components/logbook/processor.py new file mode 100644 index 00000000000..c9f20f27f7c --- /dev/null +++ b/homeassistant/components/logbook/processor.py @@ -0,0 +1,488 @@ +"""Event parser and human readable log generator.""" +from __future__ import annotations + +from collections.abc import Callable, Generator +from contextlib import suppress +from dataclasses import dataclass +from datetime import datetime as dt +import logging +import re +from typing import Any + +from sqlalchemy.engine.row import Row +from sqlalchemy.orm.query import Query + +from homeassistant.components.recorder.filters import Filters +from homeassistant.components.recorder.models import ( + process_datetime_to_timestamp, + process_timestamp_to_utc_isoformat, +) +from homeassistant.components.recorder.util import session_scope +from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN +from homeassistant.const import ( + ATTR_DOMAIN, + ATTR_ENTITY_ID, + ATTR_FRIENDLY_NAME, + ATTR_NAME, + ATTR_SERVICE, + EVENT_CALL_SERVICE, + EVENT_LOGBOOK_ENTRY, +) +from homeassistant.core import HomeAssistant, split_entity_id +from homeassistant.helpers import entity_registry as er +from homeassistant.helpers.entityfilter import EntityFilter +import homeassistant.util.dt as dt_util + +from .const import ( + ATTR_MESSAGE, + CONTEXT_DOMAIN, + CONTEXT_ENTITY_ID, + CONTEXT_ENTITY_ID_NAME, + CONTEXT_EVENT_TYPE, + CONTEXT_MESSAGE, + CONTEXT_NAME, + CONTEXT_SERVICE, + CONTEXT_STATE, + CONTEXT_USER_ID, + DOMAIN, + LOGBOOK_ENTITIES_FILTER, + LOGBOOK_ENTRY_DOMAIN, + LOGBOOK_ENTRY_ENTITY_ID, + LOGBOOK_ENTRY_ICON, + LOGBOOK_ENTRY_MESSAGE, + LOGBOOK_ENTRY_NAME, + LOGBOOK_ENTRY_STATE, + LOGBOOK_ENTRY_WHEN, + LOGBOOK_FILTERS, +) +from .helpers import is_sensor_continuous +from .models import EventAsRow, LazyEventPartialState, async_event_to_row +from .queries import statement_for_request +from .queries.common import PSUEDO_EVENT_STATE_CHANGED + +_LOGGER = logging.getLogger(__name__) + +ENTITY_ID_JSON_EXTRACT = re.compile('"entity_id": ?"([^"]+)"') +DOMAIN_JSON_EXTRACT = re.compile('"domain": ?"([^"]+)"') + + +@dataclass +class LogbookRun: + """A logbook run which may be a long running event stream or single request.""" + + context_lookup: ContextLookup + external_events: dict[ + str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] + ] + event_cache: EventCache + entity_name_cache: EntityNameCache + include_entity_name: bool + format_time: Callable[[Row], Any] + + +class EventProcessor: + """Stream into logbook format.""" + + def __init__( + self, + hass: HomeAssistant, + event_types: tuple[str, ...], + entity_ids: list[str] | None = None, + device_ids: list[str] | None = None, + context_id: str | None = None, + timestamp: bool = False, + include_entity_name: bool = True, + ) -> None: + """Init the event stream.""" + assert not ( + context_id and (entity_ids or device_ids) + ), "can't pass in both context_id and (entity_ids or device_ids)" + self.hass = hass + self.ent_reg = er.async_get(hass) + self.event_types = event_types + self.entity_ids = entity_ids + self.device_ids = device_ids + self.context_id = context_id + self.filters: Filters | None = hass.data[LOGBOOK_FILTERS] + if self.limited_select: + self.entities_filter: EntityFilter | Callable[[str], bool] | None = None + else: + self.entities_filter = hass.data[LOGBOOK_ENTITIES_FILTER] + format_time = ( + _row_time_fired_timestamp if timestamp else _row_time_fired_isoformat + ) + external_events: dict[ + str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]] + ] = hass.data.get(DOMAIN, {}) + self.logbook_run = LogbookRun( + context_lookup=ContextLookup(hass), + external_events=external_events, + event_cache=EventCache({}), + entity_name_cache=EntityNameCache(self.hass), + include_entity_name=include_entity_name, + format_time=format_time, + ) + self.context_augmenter = ContextAugmenter(self.logbook_run) + + @property + def limited_select(self) -> bool: + """Check if the stream is limited by entities context or device ids.""" + return bool(self.entity_ids or self.context_id or self.device_ids) + + def switch_to_live(self) -> None: + """Switch to live stream. + + Clear caches so we can reduce memory pressure. + """ + self.logbook_run.event_cache.clear() + self.logbook_run.context_lookup.clear() + + def get_events( + self, + start_day: dt, + end_day: dt, + ) -> list[dict[str, Any]]: + """Get events for a period of time.""" + + def yield_rows(query: Query) -> Generator[Row, None, None]: + """Yield rows from the database.""" + # end_day - start_day intentionally checks .days and not .total_seconds() + # since we don't want to switch over to buffered if they go + # over one day by a few hours since the UI makes it so easy to do that. + if self.limited_select or (end_day - start_day).days <= 1: + return query.all() # type: ignore[no-any-return] + # Only buffer rows to reduce memory pressure + # if we expect the result set is going to be very large. + # What is considered very large is going to differ + # based on the hardware Home Assistant is running on. + # + # sqlalchemy suggests that is at least 10k, but for + # even and RPi3 that number seems higher in testing + # so we don't switch over until we request > 1 day+ of data. + # + return query.yield_per(1024) # type: ignore[no-any-return] + + stmt = statement_for_request( + start_day, + end_day, + self.event_types, + self.entity_ids, + self.device_ids, + self.filters, + self.context_id, + ) + if _LOGGER.isEnabledFor(logging.DEBUG): + _LOGGER.debug( + "Literal statement: %s", + stmt.compile(compile_kwargs={"literal_binds": True}), + ) + + with session_scope(hass=self.hass) as session: + return self.humanify(yield_rows(session.execute(stmt))) + + def humanify( + self, row_generator: Generator[Row | EventAsRow, None, None] + ) -> list[dict[str, str]]: + """Humanify rows.""" + return list( + _humanify( + row_generator, + self.entities_filter, + self.ent_reg, + self.logbook_run, + self.context_augmenter, + ) + ) + + +def _humanify( + rows: Generator[Row | EventAsRow, None, None], + entities_filter: EntityFilter | Callable[[str], bool] | None, + ent_reg: er.EntityRegistry, + logbook_run: LogbookRun, + context_augmenter: ContextAugmenter, +) -> Generator[dict[str, Any], None, None]: + """Generate a converted list of events into entries.""" + # Continuous sensors, will be excluded from the logbook + continuous_sensors: dict[str, bool] = {} + context_lookup = logbook_run.context_lookup + external_events = logbook_run.external_events + event_cache = logbook_run.event_cache + entity_name_cache = logbook_run.entity_name_cache + include_entity_name = logbook_run.include_entity_name + format_time = logbook_run.format_time + + def _keep_row(row: Row | EventAsRow, event_type: str) -> bool: + """Check if the entity_filter rejects a row.""" + assert entities_filter is not None + if entity_id := _row_event_data_extract(row, ENTITY_ID_JSON_EXTRACT): + return entities_filter(entity_id) + + if event_type in external_events: + # If the entity_id isn't described, use the domain that describes + # the event for filtering. + domain: str | None = external_events[event_type][0] + else: + domain = _row_event_data_extract(row, DOMAIN_JSON_EXTRACT) + + return domain is not None and entities_filter(f"{domain}._") + + # Process rows + for row in rows: + context_id = context_lookup.memorize(row) + if row.context_only: + continue + event_type = row.event_type + if event_type == EVENT_CALL_SERVICE or ( + event_type is not PSUEDO_EVENT_STATE_CHANGED + and entities_filter is not None + and not _keep_row(row, event_type) + ): + continue + + if event_type is PSUEDO_EVENT_STATE_CHANGED: + entity_id = row.entity_id + assert entity_id is not None + # Skip continuous sensors + if ( + is_continuous := continuous_sensors.get(entity_id) + ) is None and split_entity_id(entity_id)[0] == SENSOR_DOMAIN: + is_continuous = is_sensor_continuous(ent_reg, entity_id) + continuous_sensors[entity_id] = is_continuous + if is_continuous: + continue + + data = { + LOGBOOK_ENTRY_WHEN: format_time(row), + LOGBOOK_ENTRY_STATE: row.state, + LOGBOOK_ENTRY_ENTITY_ID: entity_id, + } + if include_entity_name: + data[LOGBOOK_ENTRY_NAME] = entity_name_cache.get(entity_id) + if icon := row.icon or row.old_format_icon: + data[LOGBOOK_ENTRY_ICON] = icon + + context_augmenter.augment(data, row, context_id) + yield data + + elif event_type in external_events: + domain, describe_event = external_events[event_type] + data = describe_event(event_cache.get(row)) + data[LOGBOOK_ENTRY_WHEN] = format_time(row) + data[LOGBOOK_ENTRY_DOMAIN] = domain + context_augmenter.augment(data, row, context_id) + yield data + + elif event_type == EVENT_LOGBOOK_ENTRY: + event = event_cache.get(row) + if not (event_data := event.data): + continue + entry_domain = event_data.get(ATTR_DOMAIN) + entry_entity_id = event_data.get(ATTR_ENTITY_ID) + if entry_domain is None and entry_entity_id is not None: + with suppress(IndexError): + entry_domain = split_entity_id(str(entry_entity_id))[0] + data = { + LOGBOOK_ENTRY_WHEN: format_time(row), + LOGBOOK_ENTRY_NAME: event_data.get(ATTR_NAME), + LOGBOOK_ENTRY_MESSAGE: event_data.get(ATTR_MESSAGE), + LOGBOOK_ENTRY_DOMAIN: entry_domain, + LOGBOOK_ENTRY_ENTITY_ID: entry_entity_id, + } + context_augmenter.augment(data, row, context_id) + yield data + + +class ContextLookup: + """A lookup class for context origins.""" + + def __init__(self, hass: HomeAssistant) -> None: + """Memorize context origin.""" + self.hass = hass + self._memorize_new = True + self._lookup: dict[str | None, Row | EventAsRow | None] = {None: None} + + def memorize(self, row: Row) -> str | None: + """Memorize a context from the database.""" + if self._memorize_new: + context_id: str = row.context_id + self._lookup.setdefault(context_id, row) + return context_id + return None + + def clear(self) -> None: + """Clear the context origins and stop recording new ones.""" + self._lookup.clear() + self._memorize_new = False + + def get(self, context_id: str) -> Row | None: + """Get the context origin.""" + return self._lookup.get(context_id) + + +class ContextAugmenter: + """Augment data with context trace.""" + + def __init__(self, logbook_run: LogbookRun) -> None: + """Init the augmenter.""" + self.context_lookup = logbook_run.context_lookup + self.entity_name_cache = logbook_run.entity_name_cache + self.external_events = logbook_run.external_events + self.event_cache = logbook_run.event_cache + self.include_entity_name = logbook_run.include_entity_name + + def _get_context_row( + self, context_id: str | None, row: Row | EventAsRow + ) -> Row | EventAsRow: + """Get the context row from the id or row context.""" + if context_id: + return self.context_lookup.get(context_id) + if (context := getattr(row, "context", None)) is not None and ( + origin_event := context.origin_event + ) is not None: + return async_event_to_row(origin_event) + return None + + def augment( + self, data: dict[str, Any], row: Row | EventAsRow, context_id: str | None + ) -> None: + """Augment data from the row and cache.""" + if context_user_id := row.context_user_id: + data[CONTEXT_USER_ID] = context_user_id + + if not (context_row := self._get_context_row(context_id, row)): + return + + if _rows_match(row, context_row): + # This is the first event with the given ID. Was it directly caused by + # a parent event? + if ( + not row.context_parent_id + or ( + context_row := self._get_context_row( + row.context_parent_id, context_row + ) + ) + is None + ): + return + # Ensure the (parent) context_event exists and is not the root cause of + # this log entry. + if _rows_match(row, context_row): + return + event_type = context_row.event_type + # State change + if context_entity_id := context_row.entity_id: + data[CONTEXT_STATE] = context_row.state + data[CONTEXT_ENTITY_ID] = context_entity_id + if self.include_entity_name: + data[CONTEXT_ENTITY_ID_NAME] = self.entity_name_cache.get( + context_entity_id + ) + return + + # Call service + if event_type == EVENT_CALL_SERVICE: + event = self.event_cache.get(context_row) + event_data = event.data + data[CONTEXT_DOMAIN] = event_data.get(ATTR_DOMAIN) + data[CONTEXT_SERVICE] = event_data.get(ATTR_SERVICE) + data[CONTEXT_EVENT_TYPE] = event_type + return + + if event_type not in self.external_events: + return + + domain, describe_event = self.external_events[event_type] + data[CONTEXT_EVENT_TYPE] = event_type + data[CONTEXT_DOMAIN] = domain + event = self.event_cache.get(context_row) + described = describe_event(event) + if name := described.get(ATTR_NAME): + data[CONTEXT_NAME] = name + if message := described.get(ATTR_MESSAGE): + data[CONTEXT_MESSAGE] = message + if not (attr_entity_id := described.get(ATTR_ENTITY_ID)): + return + data[CONTEXT_ENTITY_ID] = attr_entity_id + if self.include_entity_name: + data[CONTEXT_ENTITY_ID_NAME] = self.entity_name_cache.get(attr_entity_id) + + +def _rows_match(row: Row | EventAsRow, other_row: Row | EventAsRow) -> bool: + """Check of rows match by using the same method as Events __hash__.""" + if ( + (state_id := row.state_id) is not None + and state_id == other_row.state_id + or (event_id := row.event_id) is not None + and event_id == other_row.event_id + ): + return True + return False + + +def _row_event_data_extract(row: Row | EventAsRow, extractor: re.Pattern) -> str | None: + """Extract from event_data row.""" + result = extractor.search(row.shared_data or row.event_data or "") + return result.group(1) if result else None + + +def _row_time_fired_isoformat(row: Row | EventAsRow) -> str: + """Convert the row timed_fired to isoformat.""" + return process_timestamp_to_utc_isoformat(row.time_fired or dt_util.utcnow()) + + +def _row_time_fired_timestamp(row: Row | EventAsRow) -> float: + """Convert the row timed_fired to timestamp.""" + return process_datetime_to_timestamp(row.time_fired or dt_util.utcnow()) + + +class EntityNameCache: + """A cache to lookup the name for an entity. + + This class should not be used to lookup attributes + that are expected to change state. + """ + + def __init__(self, hass: HomeAssistant) -> None: + """Init the cache.""" + self._hass = hass + self._names: dict[str, str] = {} + + def get(self, entity_id: str) -> str: + """Lookup an the friendly name.""" + if entity_id in self._names: + return self._names[entity_id] + if (current_state := self._hass.states.get(entity_id)) and ( + friendly_name := current_state.attributes.get(ATTR_FRIENDLY_NAME) + ): + self._names[entity_id] = friendly_name + else: + return split_entity_id(entity_id)[1].replace("_", " ") + + return self._names[entity_id] + + +class EventCache: + """Cache LazyEventPartialState by row.""" + + def __init__(self, event_data_cache: dict[str, dict[str, Any]]) -> None: + """Init the cache.""" + self._event_data_cache = event_data_cache + self.event_cache: dict[Row | EventAsRow, LazyEventPartialState] = {} + + def get(self, row: EventAsRow | Row) -> LazyEventPartialState: + """Get the event from the row.""" + if isinstance(row, EventAsRow): + return LazyEventPartialState(row, self._event_data_cache) + if event := self.event_cache.get(row): + return event + self.event_cache[row] = lazy_event = LazyEventPartialState( + row, self._event_data_cache + ) + return lazy_event + + def clear(self) -> None: + """Clear the event cache.""" + self._event_data_cache = {} + self.event_cache = {} diff --git a/homeassistant/components/logbook/rest_api.py b/homeassistant/components/logbook/rest_api.py new file mode 100644 index 00000000000..a1a7db3ed2c --- /dev/null +++ b/homeassistant/components/logbook/rest_api.py @@ -0,0 +1,120 @@ +"""Event parser and human readable log generator.""" +from __future__ import annotations + +from datetime import timedelta +from http import HTTPStatus +from typing import Any, cast + +from aiohttp import web +import voluptuous as vol + +from homeassistant.components.http import HomeAssistantView +from homeassistant.components.recorder import get_instance +from homeassistant.components.recorder.filters import Filters +from homeassistant.core import HomeAssistant, callback +from homeassistant.exceptions import InvalidEntityFormatError +from homeassistant.helpers import config_validation as cv +from homeassistant.helpers.entityfilter import EntityFilter +from homeassistant.helpers.typing import ConfigType +import homeassistant.util.dt as dt_util + +from .helpers import async_determine_event_types +from .processor import EventProcessor + + +@callback +def async_setup( + hass: HomeAssistant, + conf: ConfigType, + filters: Filters | None, + entities_filter: EntityFilter | None, +) -> None: + """Set up the logbook rest API.""" + hass.http.register_view(LogbookView(conf, filters, entities_filter)) + + +class LogbookView(HomeAssistantView): + """Handle logbook view requests.""" + + url = "/api/logbook" + name = "api:logbook" + extra_urls = ["/api/logbook/{datetime}"] + + def __init__( + self, + config: dict[str, Any], + filters: Filters | None, + entities_filter: EntityFilter | None, + ) -> None: + """Initialize the logbook view.""" + self.config = config + self.filters = filters + self.entities_filter = entities_filter + + async def get( + self, request: web.Request, datetime: str | None = None + ) -> web.Response: + """Retrieve logbook entries.""" + if datetime: + if (datetime_dt := dt_util.parse_datetime(datetime)) is None: + return self.json_message("Invalid datetime", HTTPStatus.BAD_REQUEST) + else: + datetime_dt = dt_util.start_of_local_day() + + if (period_str := request.query.get("period")) is None: + period: int = 1 + else: + period = int(period_str) + + if entity_ids_str := request.query.get("entity"): + try: + entity_ids = cv.entity_ids(entity_ids_str) + except vol.Invalid: + raise InvalidEntityFormatError( + f"Invalid entity id(s) encountered: {entity_ids_str}. " + "Format should be ." + ) from vol.Invalid + else: + entity_ids = None + + if (end_time_str := request.query.get("end_time")) is None: + start_day = dt_util.as_utc(datetime_dt) - timedelta(days=period - 1) + end_day = start_day + timedelta(days=period) + else: + start_day = datetime_dt + if (end_day_dt := dt_util.parse_datetime(end_time_str)) is None: + return self.json_message("Invalid end_time", HTTPStatus.BAD_REQUEST) + end_day = end_day_dt + + hass = request.app["hass"] + + context_id = request.query.get("context_id") + + if entity_ids and context_id: + return self.json_message( + "Can't combine entity with context_id", HTTPStatus.BAD_REQUEST + ) + + event_types = async_determine_event_types(hass, entity_ids, None) + event_processor = EventProcessor( + hass, + event_types, + entity_ids, + None, + context_id, + timestamp=False, + include_entity_name=True, + ) + + def json_events() -> web.Response: + """Fetch events and generate JSON.""" + return self.json( + event_processor.get_events( + start_day, + end_day, + ) + ) + + return cast( + web.Response, await get_instance(hass).async_add_executor_job(json_events) + ) diff --git a/homeassistant/components/logbook/websocket_api.py b/homeassistant/components/logbook/websocket_api.py new file mode 100644 index 00000000000..efb15efe97b --- /dev/null +++ b/homeassistant/components/logbook/websocket_api.py @@ -0,0 +1,319 @@ +"""Event parser and human readable log generator.""" +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from datetime import datetime as dt, timedelta +import logging +from typing import Any + +import voluptuous as vol + +from homeassistant.components import websocket_api +from homeassistant.components.recorder import get_instance +from homeassistant.components.websocket_api import messages +from homeassistant.components.websocket_api.connection import ActiveConnection +from homeassistant.components.websocket_api.const import JSON_DUMP +from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback +import homeassistant.util.dt as dt_util + +from .helpers import ( + async_determine_event_types, + async_filter_entities, + async_subscribe_events, +) +from .models import async_event_to_row +from .processor import EventProcessor + +MAX_PENDING_LOGBOOK_EVENTS = 2048 +EVENT_COALESCE_TIME = 0.5 +MAX_RECORDER_WAIT = 10 + +_LOGGER = logging.getLogger(__name__) + + +@callback +def async_setup(hass: HomeAssistant) -> None: + """Set up the logbook websocket API.""" + websocket_api.async_register_command(hass, ws_get_events) + websocket_api.async_register_command(hass, ws_event_stream) + + +async def _async_get_ws_formatted_events( + hass: HomeAssistant, + msg_id: int, + start_time: dt, + end_time: dt, + formatter: Callable[[int, Any], dict[str, Any]], + event_processor: EventProcessor, +) -> tuple[str, dt | None]: + """Async wrapper around _ws_formatted_get_events.""" + return await get_instance(hass).async_add_executor_job( + _ws_formatted_get_events, + msg_id, + start_time, + end_time, + formatter, + event_processor, + ) + + +def _ws_formatted_get_events( + msg_id: int, + start_day: dt, + end_day: dt, + formatter: Callable[[int, Any], dict[str, Any]], + event_processor: EventProcessor, +) -> tuple[str, dt | None]: + """Fetch events and convert them to json in the executor.""" + events = event_processor.get_events(start_day, end_day) + last_time = None + if events: + last_time = dt_util.utc_from_timestamp(events[-1]["when"]) + result = formatter(msg_id, events) + return JSON_DUMP(result), last_time + + +async def _async_events_consumer( + setup_complete_future: asyncio.Future[dt], + connection: ActiveConnection, + msg_id: int, + stream_queue: asyncio.Queue[Event], + event_processor: EventProcessor, +) -> None: + """Stream events from the queue.""" + subscriptions_setup_complete_time = await setup_complete_future + event_processor.switch_to_live() + + while True: + events: list[Event] = [await stream_queue.get()] + # If the event is older than the last db + # event we already sent it so we skip it. + if events[0].time_fired <= subscriptions_setup_complete_time: + continue + # We sleep for the EVENT_COALESCE_TIME so + # we can group events together to minimize + # the number of websocket messages when the + # system is overloaded with an event storm + await asyncio.sleep(EVENT_COALESCE_TIME) + while not stream_queue.empty(): + events.append(stream_queue.get_nowait()) + + if logbook_events := event_processor.humanify( + async_event_to_row(e) for e in events + ): + connection.send_message( + JSON_DUMP( + messages.event_message( + msg_id, + logbook_events, + ) + ) + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "logbook/event_stream", + vol.Required("start_time"): str, + vol.Optional("entity_ids"): [str], + vol.Optional("device_ids"): [str], + } +) +@websocket_api.async_response +async def ws_event_stream( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict +) -> None: + """Handle logbook stream events websocket command.""" + start_time_str = msg["start_time"] + utc_now = dt_util.utcnow() + + if start_time := dt_util.parse_datetime(start_time_str): + start_time = dt_util.as_utc(start_time) + + if not start_time or start_time > utc_now: + connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time") + return + + device_ids = msg.get("device_ids") + entity_ids = msg.get("entity_ids") + if entity_ids: + entity_ids = async_filter_entities(hass, entity_ids) + event_types = async_determine_event_types(hass, entity_ids, device_ids) + + event_processor = EventProcessor( + hass, + event_types, + entity_ids, + device_ids, + None, + timestamp=True, + include_entity_name=False, + ) + + stream_queue: asyncio.Queue[Event] = asyncio.Queue(MAX_PENDING_LOGBOOK_EVENTS) + subscriptions: list[CALLBACK_TYPE] = [] + setup_complete_future: asyncio.Future[dt] = asyncio.Future() + task = asyncio.create_task( + _async_events_consumer( + setup_complete_future, + connection, + msg["id"], + stream_queue, + event_processor, + ) + ) + + def _unsub() -> None: + """Unsubscribe from all events.""" + for subscription in subscriptions: + subscription() + subscriptions.clear() + if task: + task.cancel() + + @callback + def _queue_or_cancel(event: Event) -> None: + """Queue an event to be processed or cancel.""" + try: + stream_queue.put_nowait(event) + except asyncio.QueueFull: + _LOGGER.debug( + "Client exceeded max pending messages of %s", + MAX_PENDING_LOGBOOK_EVENTS, + ) + _unsub() + + async_subscribe_events( + hass, subscriptions, _queue_or_cancel, event_types, entity_ids, device_ids + ) + subscriptions_setup_complete_time = dt_util.utcnow() + connection.subscriptions[msg["id"]] = _unsub + connection.send_result(msg["id"]) + + # Fetch everything from history + message, last_event_time = await _async_get_ws_formatted_events( + hass, + msg["id"], + start_time, + subscriptions_setup_complete_time, + messages.event_message, + event_processor, + ) + # If there is no last_time there are no historical + # results, but we still send an empty message so + # consumers of the api know their request was + # answered but there were no results + connection.send_message(message) + try: + await asyncio.wait_for( + get_instance(hass).async_block_till_done(), MAX_RECORDER_WAIT + ) + except asyncio.TimeoutError: + _LOGGER.debug( + "Recorder is behind more than %s seconds, starting live stream; Some results may be missing" + ) + + if setup_complete_future.cancelled(): + # Unsubscribe happened while waiting for recorder + return + + # + # Fetch any events from the database that have + # not been committed since the original fetch + # so we can switch over to using the subscriptions + # + # We only want events that happened after the last event + # we had from the last database query or the maximum + # time we allow the recorder to be behind + # + max_recorder_behind = subscriptions_setup_complete_time - timedelta( + seconds=MAX_RECORDER_WAIT + ) + second_fetch_start_time = max( + last_event_time or max_recorder_behind, max_recorder_behind + ) + message, final_cutoff_time = await _async_get_ws_formatted_events( + hass, + msg["id"], + second_fetch_start_time, + subscriptions_setup_complete_time, + messages.event_message, + event_processor, + ) + if final_cutoff_time: # Only sends results if we have them + connection.send_message(message) + + if not setup_complete_future.cancelled(): + # Unsubscribe happened while waiting for formatted events + setup_complete_future.set_result(subscriptions_setup_complete_time) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "logbook/get_events", + vol.Required("start_time"): str, + vol.Optional("end_time"): str, + vol.Optional("entity_ids"): [str], + vol.Optional("device_ids"): [str], + vol.Optional("context_id"): str, + } +) +@websocket_api.async_response +async def ws_get_events( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict +) -> None: + """Handle logbook get events websocket command.""" + start_time_str = msg["start_time"] + end_time_str = msg.get("end_time") + utc_now = dt_util.utcnow() + + if start_time := dt_util.parse_datetime(start_time_str): + start_time = dt_util.as_utc(start_time) + else: + connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time") + return + + if not end_time_str: + end_time = utc_now + elif parsed_end_time := dt_util.parse_datetime(end_time_str): + end_time = dt_util.as_utc(parsed_end_time) + else: + connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time") + return + + if start_time > utc_now: + connection.send_result(msg["id"], []) + return + + device_ids = msg.get("device_ids") + entity_ids = msg.get("entity_ids") + context_id = msg.get("context_id") + if entity_ids: + entity_ids = async_filter_entities(hass, entity_ids) + if not entity_ids and not device_ids: + # Everything has been filtered away + connection.send_result(msg["id"], []) + return + + event_types = async_determine_event_types(hass, entity_ids, device_ids) + + event_processor = EventProcessor( + hass, + event_types, + entity_ids, + device_ids, + context_id, + timestamp=True, + include_entity_name=False, + ) + + message, _ = await _async_get_ws_formatted_events( + hass, + msg["id"], + start_time, + end_time, + messages.result_message, + event_processor, + ) + connection.send_message(message) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 5e6fafdfa61..716e3a19d09 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -75,6 +75,7 @@ from .tasks import ( RecorderTask, StatisticsTask, StopTask, + SynchronizeTask, UpdateStatisticsMetadataTask, WaitTask, ) @@ -928,6 +929,12 @@ class Recorder(threading.Thread): if self._async_event_filter(event): self.queue_task(EventTask(event)) + async def async_block_till_done(self) -> None: + """Async version of block_till_done.""" + event = asyncio.Event() + self.queue_task(SynchronizeTask(event)) + await event.wait() + def block_till_done(self) -> None: """Block till all events processed. diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index e12526b316a..07855d27dff 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -248,3 +248,17 @@ class AddRecorderPlatformTask(RecorderTask): platforms[domain] = platform if hasattr(self.platform, "exclude_attributes"): hass.data[EXCLUDE_ATTRIBUTES][domain] = platform.exclude_attributes(hass) + + +@dataclass +class SynchronizeTask(RecorderTask): + """Ensure all pending data has been committed.""" + + # commit_before is the default + event: asyncio.Event + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + # Does not use a tracked task to avoid + # blocking shutdown if the recorder is broken + instance.hass.loop.call_soon_threadsafe(self.event.set) diff --git a/homeassistant/components/websocket_api/messages.py b/homeassistant/components/websocket_api/messages.py index 1c09bc1d567..f546ba5eec6 100644 --- a/homeassistant/components/websocket_api/messages.py +++ b/homeassistant/components/websocket_api/messages.py @@ -61,7 +61,7 @@ def error_message(iden: int | None, code: str, message: str) -> dict[str, Any]: } -def event_message(iden: JSON_TYPE, event: Any) -> dict[str, Any]: +def event_message(iden: JSON_TYPE | int, event: Any) -> dict[str, Any]: """Return an event message.""" return {"id": iden, "type": "event", "event": event} diff --git a/homeassistant/core.py b/homeassistant/core.py index 7fcf07d9c66..2753b801347 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -716,13 +716,14 @@ class HomeAssistant: self._stopped.set() -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=False) class Context: """The context that triggered something.""" user_id: str | None = attr.ib(default=None) parent_id: str | None = attr.ib(default=None) id: str = attr.ib(factory=ulid_util.ulid) + origin_event: Event | None = attr.ib(default=None, eq=False) def as_dict(self) -> dict[str, str | None]: """Return a dictionary representation of the context.""" @@ -866,6 +867,8 @@ class EventBus: listeners = match_all_listeners + listeners event = Event(event_type, event_data, origin, time_fired, context) + if not event.context.origin_event: + event.context.origin_event = event _LOGGER.debug("Bus:Handling %s", event) diff --git a/tests/components/logbook/common.py b/tests/components/logbook/common.py index 9d6f35eb702..b88c3854967 100644 --- a/tests/components/logbook/common.py +++ b/tests/components/logbook/common.py @@ -5,6 +5,7 @@ import json from typing import Any from homeassistant.components import logbook +from homeassistant.components.logbook import processor from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat from homeassistant.core import Context from homeassistant.helpers import entity_registry as er @@ -50,16 +51,26 @@ class MockRow: def mock_humanify(hass_, rows): """Wrap humanify with mocked logbook objects.""" - entity_name_cache = logbook.EntityNameCache(hass_) + entity_name_cache = processor.EntityNameCache(hass_) ent_reg = er.async_get(hass_) + event_cache = processor.EventCache({}) + context_lookup = processor.ContextLookup(hass_) external_events = hass_.data.get(logbook.DOMAIN, {}) + logbook_run = processor.LogbookRun( + context_lookup, + external_events, + event_cache, + entity_name_cache, + include_entity_name=True, + format_time=processor._row_time_fired_isoformat, + ) + context_augmenter = processor.ContextAugmenter(logbook_run) return list( - logbook._humanify( + processor._humanify( rows, None, ent_reg, - external_events, - entity_name_cache, - logbook._row_time_fired_isoformat, + logbook_run, + context_augmenter, ), ) diff --git a/tests/components/logbook/test_init.py b/tests/components/logbook/test_init.py index 3b18c594e6e..101fb74e690 100644 --- a/tests/components/logbook/test_init.py +++ b/tests/components/logbook/test_init.py @@ -14,6 +14,9 @@ import voluptuous as vol from homeassistant.components import logbook from homeassistant.components.alexa.smart_home import EVENT_ALEXA_SMART_HOME from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED +from homeassistant.components.logbook.models import LazyEventPartialState +from homeassistant.components.logbook.processor import EventProcessor +from homeassistant.components.logbook.queries.common import PSUEDO_EVENT_STATE_CHANGED from homeassistant.components.script import EVENT_SCRIPT_STARTED from homeassistant.components.sensor import SensorStateClass from homeassistant.const import ( @@ -95,15 +98,12 @@ async def test_service_call_create_logbook_entry(hass_): # Our service call will unblock when the event listeners have been # scheduled. This means that they may not have been processed yet. await async_wait_recording_done(hass_) - ent_reg = er.async_get(hass_) + event_processor = EventProcessor(hass_, (EVENT_LOGBOOK_ENTRY,)) events = list( - logbook._get_events( - hass_, + event_processor.get_events( dt_util.utcnow() - timedelta(hours=1), dt_util.utcnow() + timedelta(hours=1), - (EVENT_LOGBOOK_ENTRY,), - ent_reg, ) ) assert len(events) == 2 @@ -137,15 +137,11 @@ async def test_service_call_create_logbook_entry_invalid_entity_id(hass, recorde }, ) await async_wait_recording_done(hass) - ent_reg = er.async_get(hass) - + event_processor = EventProcessor(hass, (EVENT_LOGBOOK_ENTRY,)) events = list( - logbook._get_events( - hass, + event_processor.get_events( dt_util.utcnow() - timedelta(hours=1), dt_util.utcnow() + timedelta(hours=1), - (EVENT_LOGBOOK_ENTRY,), - ent_reg, ) ) assert len(events) == 1 @@ -335,7 +331,7 @@ def create_state_changed_event_from_old_new( ], ) - row.event_type = logbook.PSUEDO_EVENT_STATE_CHANGED + row.event_type = PSUEDO_EVENT_STATE_CHANGED row.event_data = "{}" row.shared_data = "{}" row.attributes = attributes_json @@ -353,7 +349,7 @@ def create_state_changed_event_from_old_new( row.context_parent_id = None row.old_state_id = old_state and 1 row.state_id = new_state and 1 - return logbook.LazyEventPartialState(row, {}) + return LazyEventPartialState(row, {}) async def test_logbook_view(hass, hass_client, recorder_mock): diff --git a/tests/components/logbook/test_websocket_api.py b/tests/components/logbook/test_websocket_api.py new file mode 100644 index 00000000000..585732b66f1 --- /dev/null +++ b/tests/components/logbook/test_websocket_api.py @@ -0,0 +1,1166 @@ +"""The tests for the logbook component.""" +import asyncio +from collections.abc import Callable +from datetime import timedelta +from unittest.mock import ANY, patch + +import pytest + +from homeassistant import core +from homeassistant.components import logbook, recorder +from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED +from homeassistant.components.logbook import websocket_api +from homeassistant.components.script import EVENT_SCRIPT_STARTED +from homeassistant.components.websocket_api.const import TYPE_RESULT +from homeassistant.const import ( + ATTR_ENTITY_ID, + ATTR_FRIENDLY_NAME, + ATTR_NAME, + ATTR_UNIT_OF_MEASUREMENT, + EVENT_HOMEASSISTANT_START, + STATE_OFF, + STATE_ON, +) +from homeassistant.core import Event, HomeAssistant, State +from homeassistant.helpers import device_registry +from homeassistant.setup import async_setup_component +import homeassistant.util.dt as dt_util + +from tests.common import MockConfigEntry, SetupRecorderInstanceT +from tests.components.recorder.common import ( + async_block_recorder, + async_recorder_block_till_done, + async_wait_recording_done, +) + + +@pytest.fixture() +def set_utc(hass): + """Set timezone to UTC.""" + hass.config.set_time_zone("UTC") + + +async def _async_mock_device_with_logbook_platform(hass): + """Mock an integration that provides a device that are described by the logbook.""" + entry = MockConfigEntry(domain="test", data={"first": True}, options=None) + entry.add_to_hass(hass) + dev_reg = device_registry.async_get(hass) + device = dev_reg.async_get_or_create( + config_entry_id=entry.entry_id, + connections={(device_registry.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, + identifiers={("bridgeid", "0123")}, + sw_version="sw-version", + name="device name", + manufacturer="manufacturer", + model="model", + suggested_area="Game Room", + ) + + class MockLogbookPlatform: + """Mock a logbook platform.""" + + @core.callback + def async_describe_events( + hass: HomeAssistant, + async_describe_event: Callable[ + [str, str, Callable[[Event], dict[str, str]]], None + ], + ) -> None: + """Describe logbook events.""" + + @core.callback + def async_describe_test_event(event: Event) -> dict[str, str]: + """Describe mock logbook event.""" + return { + "name": "device name", + "message": event.data.get("message", "is on fire"), + } + + async_describe_event("test", "mock_event", async_describe_test_event) + + await logbook._process_logbook_platform(hass, "test", MockLogbookPlatform) + return device + + +async def test_get_events(hass, hass_ws_client, recorder_mock): + """Test logbook get_events.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook") + ] + ) + await async_recorder_block_till_done(hass) + + hass.bus.async_fire(EVENT_HOMEASSISTANT_START) + + hass.states.async_set("light.kitchen", STATE_OFF) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 100}) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 200}) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 300}) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400}) + await hass.async_block_till_done() + context = core.Context( + id="ac5bd62de45711eaaeb351041eec8dd9", + user_id="b400facee45711eaa9308bfd3d19e474", + ) + + hass.states.async_set("light.kitchen", STATE_OFF, context=context) + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "end_time": now.isoformat(), + "entity_ids": ["light.kitchen"], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == [] + + await client.send_json( + { + "id": 2, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "entity_ids": ["sensor.test"], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + assert response["result"] == [] + + await client.send_json( + { + "id": 3, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "entity_ids": ["light.kitchen"], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + + results = response["result"] + assert results[0]["entity_id"] == "light.kitchen" + assert results[0]["state"] == "on" + assert results[1]["entity_id"] == "light.kitchen" + assert results[1]["state"] == "off" + + await client.send_json( + { + "id": 4, + "type": "logbook/get_events", + "start_time": now.isoformat(), + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 4 + + results = response["result"] + assert len(results) == 3 + assert results[0]["message"] == "started" + assert results[1]["entity_id"] == "light.kitchen" + assert results[1]["state"] == "on" + assert isinstance(results[1]["when"], float) + assert results[2]["entity_id"] == "light.kitchen" + assert results[2]["state"] == "off" + assert isinstance(results[2]["when"], float) + + await client.send_json( + { + "id": 5, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 5 + + results = response["result"] + assert len(results) == 1 + assert results[0]["entity_id"] == "light.kitchen" + assert results[0]["state"] == "off" + assert isinstance(results[0]["when"], float) + + +async def test_get_events_entities_filtered_away(hass, hass_ws_client, recorder_mock): + """Test logbook get_events all entities filtered away.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook") + ] + ) + await async_recorder_block_till_done(hass) + + hass.bus.async_fire(EVENT_HOMEASSISTANT_START) + + hass.states.async_set("light.kitchen", STATE_ON) + await hass.async_block_till_done() + hass.states.async_set( + "light.filtered", STATE_ON, {"brightness": 100, ATTR_UNIT_OF_MEASUREMENT: "any"} + ) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_OFF, {"brightness": 200}) + await hass.async_block_till_done() + hass.states.async_set( + "light.filtered", + STATE_OFF, + {"brightness": 300, ATTR_UNIT_OF_MEASUREMENT: "any"}, + ) + + await async_wait_recording_done(hass) + client = await hass_ws_client() + + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "entity_ids": ["light.kitchen"], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + + results = response["result"] + assert results[0]["entity_id"] == "light.kitchen" + assert results[0]["state"] == "off" + + await client.send_json( + { + "id": 2, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "entity_ids": ["light.filtered"], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + + results = response["result"] + assert len(results) == 0 + + +async def test_get_events_future_start_time(hass, hass_ws_client, recorder_mock): + """Test get_events with a future start time.""" + await async_setup_component(hass, "logbook", {}) + await async_recorder_block_till_done(hass) + future = dt_util.utcnow() + timedelta(hours=10) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "start_time": future.isoformat(), + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + + results = response["result"] + assert isinstance(results, list) + assert len(results) == 0 + + +async def test_get_events_bad_start_time(hass, hass_ws_client, recorder_mock): + """Test get_events bad start time.""" + await async_setup_component(hass, "logbook", {}) + await async_recorder_block_till_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "start_time": "cats", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_start_time" + + +async def test_get_events_bad_end_time(hass, hass_ws_client, recorder_mock): + """Test get_events bad end time.""" + now = dt_util.utcnow() + await async_setup_component(hass, "logbook", {}) + await async_recorder_block_till_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "end_time": "dogs", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_end_time" + + +async def test_get_events_invalid_filters(hass, hass_ws_client, recorder_mock): + """Test get_events invalid filters.""" + await async_setup_component(hass, "logbook", {}) + await async_recorder_block_till_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "entity_ids": [], + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_format" + await client.send_json( + { + "id": 2, + "type": "logbook/get_events", + "device_ids": [], + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_format" + + +async def test_get_events_with_device_ids(hass, hass_ws_client, recorder_mock): + """Test logbook get_events for device ids.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook") + ] + ) + + device = await _async_mock_device_with_logbook_platform(hass) + + hass.bus.async_fire(EVENT_HOMEASSISTANT_START) + hass.bus.async_fire("mock_event", {"device_id": device.id}) + + hass.states.async_set("light.kitchen", STATE_OFF) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 100}) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 200}) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 300}) + await hass.async_block_till_done() + hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400}) + await hass.async_block_till_done() + context = core.Context( + id="ac5bd62de45711eaaeb351041eec8dd9", + user_id="b400facee45711eaa9308bfd3d19e474", + ) + + hass.states.async_set("light.kitchen", STATE_OFF, context=context) + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + client = await hass_ws_client() + + await client.send_json( + { + "id": 1, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "device_ids": [device.id], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + + results = response["result"] + assert len(results) == 1 + assert results[0]["name"] == "device name" + assert results[0]["message"] == "is on fire" + assert isinstance(results[0]["when"], float) + + await client.send_json( + { + "id": 2, + "type": "logbook/get_events", + "start_time": now.isoformat(), + "entity_ids": ["light.kitchen"], + "device_ids": [device.id], + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 2 + + results = response["result"] + assert results[0]["domain"] == "test" + assert results[0]["message"] == "is on fire" + assert results[0]["name"] == "device name" + assert results[1]["entity_id"] == "light.kitchen" + assert results[1]["state"] == "on" + assert results[2]["entity_id"] == "light.kitchen" + assert results[2]["state"] == "off" + + await client.send_json( + { + "id": 3, + "type": "logbook/get_events", + "start_time": now.isoformat(), + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 3 + + results = response["result"] + assert len(results) == 4 + assert results[0]["message"] == "started" + assert results[1]["name"] == "device name" + assert results[1]["message"] == "is on fire" + assert isinstance(results[1]["when"], float) + assert results[2]["entity_id"] == "light.kitchen" + assert results[2]["state"] == "on" + assert isinstance(results[2]["when"], float) + assert results[3]["entity_id"] == "light.kitchen" + assert results[3]["state"] == "off" + assert isinstance(results[3]["when"], float) + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_unsubscribe_logbook_stream( + hass, recorder_mock, hass_ws_client +): + """Test subscribe/unsubscribe logbook stream.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + state: State = hass.states.get("binary_sensor.is_light") + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + await websocket_client.send_json( + {"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()} + ) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.is_light", + "state": "off", + "when": state.last_updated.timestamp(), + } + ] + + hass.states.async_set("light.alpha", "on") + hass.states.async_set("light.alpha", "off") + alpha_off_state: State = hass.states.get("light.alpha") + hass.states.async_set("light.zulu", "on", {"color": "blue"}) + hass.states.async_set("light.zulu", "off", {"effect": "help"}) + zulu_off_state: State = hass.states.get("light.zulu") + hass.states.async_set( + "light.zulu", "on", {"effect": "help", "color": ["blue", "green"]} + ) + zulu_on_state: State = hass.states.get("light.zulu") + await hass.async_block_till_done() + + hass.states.async_remove("light.zulu") + await hass.async_block_till_done() + + hass.states.async_set("light.zulu", "on", {"effect": "help", "color": "blue"}) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "light.alpha", + "state": "off", + "when": alpha_off_state.last_updated.timestamp(), + }, + { + "entity_id": "light.zulu", + "state": "off", + "when": zulu_off_state.last_updated.timestamp(), + }, + { + "entity_id": "light.zulu", + "state": "on", + "when": zulu_on_state.last_updated.timestamp(), + }, + ] + + hass.bus.async_fire( + EVENT_AUTOMATION_TRIGGERED, + {ATTR_NAME: "Mock automation", ATTR_ENTITY_ID: "automation.mock_automation"}, + ) + hass.bus.async_fire( + EVENT_SCRIPT_STARTED, + {ATTR_NAME: "Mock script", ATTR_ENTITY_ID: "script.mock_script"}, + ) + hass.bus.async_fire(EVENT_HOMEASSISTANT_START) + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "context_id": ANY, + "domain": "automation", + "entity_id": "automation.mock_automation", + "message": "triggered", + "name": "Mock automation", + "source": None, + "when": ANY, + }, + { + "context_id": ANY, + "domain": "script", + "entity_id": "script.mock_script", + "message": "started", + "name": "Mock script", + "when": ANY, + }, + { + "domain": "homeassistant", + "icon": "mdi:home-assistant", + "message": "started", + "name": "Home Assistant", + "when": ANY, + }, + ] + + context = core.Context( + id="ac5bd62de45711eaaeb351041eec8dd9", + user_id="b400facee45711eaa9308bfd3d19e474", + ) + automation_entity_id_test = "automation.alarm" + hass.bus.async_fire( + EVENT_AUTOMATION_TRIGGERED, + {ATTR_NAME: "Mock automation", ATTR_ENTITY_ID: automation_entity_id_test}, + context=context, + ) + hass.bus.async_fire( + EVENT_SCRIPT_STARTED, + {ATTR_NAME: "Mock script", ATTR_ENTITY_ID: "script.mock_script"}, + context=context, + ) + hass.states.async_set( + automation_entity_id_test, + STATE_ON, + {ATTR_FRIENDLY_NAME: "Alarm Automation"}, + context=context, + ) + entity_id_test = "alarm_control_panel.area_001" + hass.states.async_set(entity_id_test, STATE_OFF, context=context) + hass.states.async_set(entity_id_test, STATE_ON, context=context) + entity_id_second = "alarm_control_panel.area_002" + hass.states.async_set(entity_id_second, STATE_OFF, context=context) + hass.states.async_set(entity_id_second, STATE_ON, context=context) + + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_user_id": "b400facee45711eaa9308bfd3d19e474", + "domain": "automation", + "entity_id": "automation.alarm", + "message": "triggered", + "name": "Mock automation", + "source": None, + "when": ANY, + }, + { + "context_domain": "automation", + "context_entity_id": "automation.alarm", + "context_event_type": "automation_triggered", + "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_message": "triggered", + "context_name": "Mock automation", + "context_user_id": "b400facee45711eaa9308bfd3d19e474", + "domain": "script", + "entity_id": "script.mock_script", + "message": "started", + "name": "Mock script", + "when": ANY, + }, + { + "context_domain": "automation", + "context_entity_id": "automation.alarm", + "context_event_type": "automation_triggered", + "context_message": "triggered", + "context_name": "Mock automation", + "context_user_id": "b400facee45711eaa9308bfd3d19e474", + "entity_id": "alarm_control_panel.area_001", + "state": "on", + "when": ANY, + }, + { + "context_domain": "automation", + "context_entity_id": "automation.alarm", + "context_event_type": "automation_triggered", + "context_message": "triggered", + "context_name": "Mock automation", + "context_user_id": "b400facee45711eaa9308bfd3d19e474", + "entity_id": "alarm_control_panel.area_002", + "state": "on", + "when": ANY, + }, + ] + hass.bus.async_fire( + EVENT_AUTOMATION_TRIGGERED, + {ATTR_NAME: "Mock automation 2", ATTR_ENTITY_ID: automation_entity_id_test}, + context=context, + ) + + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "context_domain": "automation", + "context_entity_id": "automation.alarm", + "context_event_type": "automation_triggered", + "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_message": "triggered", + "context_name": "Mock automation", + "context_user_id": "b400facee45711eaa9308bfd3d19e474", + "domain": "automation", + "entity_id": "automation.alarm", + "message": "triggered", + "name": "Mock automation 2", + "source": None, + "when": ANY, + } + ] + + await async_wait_recording_done(hass) + hass.bus.async_fire( + EVENT_AUTOMATION_TRIGGERED, + {ATTR_NAME: "Mock automation 3", ATTR_ENTITY_ID: automation_entity_id_test}, + context=context, + ) + + await hass.async_block_till_done() + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "context_domain": "automation", + "context_entity_id": "automation.alarm", + "context_event_type": "automation_triggered", + "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_message": "triggered", + "context_name": "Mock automation", + "context_user_id": "b400facee45711eaa9308bfd3d19e474", + "domain": "automation", + "entity_id": "automation.alarm", + "message": "triggered", + "name": "Mock automation 3", + "source": None, + "when": ANY, + } + ] + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await websocket_client.receive_json() + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_unsubscribe_logbook_stream_entities( + hass, recorder_mock, hass_ws_client +): + """Test subscribe/unsubscribe logbook stream with specific entities.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + state: State = hass.states.get("binary_sensor.is_light") + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "entity_ids": ["light.small", "binary_sensor.is_light"], + } + ) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.is_light", + "state": "off", + "when": state.last_updated.timestamp(), + } + ] + + hass.states.async_set("light.alpha", STATE_ON) + hass.states.async_set("light.alpha", STATE_OFF) + hass.states.async_set("light.small", STATE_OFF, {"effect": "help", "color": "blue"}) + + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "light.small", + "state": "off", + "when": ANY, + }, + ] + + hass.states.async_remove("light.alpha") + hass.states.async_remove("light.small") + await hass.async_block_till_done() + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await websocket_client.receive_json() + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_unsubscribe_logbook_stream_device( + hass, recorder_mock, hass_ws_client +): + """Test subscribe/unsubscribe logbook stream with a device.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + device = await _async_mock_device_with_logbook_platform(hass) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "device_ids": [device.id], + } + ) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # There are no answers to our initial query + # so we get an empty reply. This is to ensure + # consumers of the api know there are no results + # and its not a failure case. This is useful + # in the frontend so we can tell the user there + # are no results vs waiting for them to appear + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [] + + hass.bus.async_fire("mock_event", {"device_id": device.id}) + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + {"domain": "test", "message": "is on fire", "name": "device name", "when": ANY} + ] + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await websocket_client.receive_json() + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +async def test_event_stream_bad_start_time(hass, hass_ws_client, recorder_mock): + """Test event_stream bad start time.""" + await async_setup_component(hass, "logbook", {}) + await async_recorder_block_till_done(hass) + + client = await hass_ws_client() + await client.send_json( + { + "id": 1, + "type": "logbook/event_stream", + "start_time": "cats", + } + ) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "invalid_start_time" + + +async def test_live_stream_with_one_second_commit_interval( + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, + hass_ws_client, +): + """Test the recorder with a 1s commit interval.""" + config = {recorder.CONF_COMMIT_INTERVAL: 0.5} + await async_setup_recorder_instance(hass, config) + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + device = await _async_mock_device_with_logbook_platform(hass) + + await hass.async_block_till_done() + init_count = sum(hass.bus.async_listeners().values()) + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"}) + + await async_wait_recording_done(hass) + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "2"}) + + await hass.async_block_till_done() + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "3"}) + + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "device_ids": [device.id], + } + ) + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "4"}) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "5"}) + + recieved_rows = [] + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + recieved_rows.extend(msg["event"]) + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "6"}) + + await hass.async_block_till_done() + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "7"}) + + while not len(recieved_rows) == 7: + msg = await asyncio.wait_for(websocket_client.receive_json(), 2.5) + assert msg["id"] == 7 + assert msg["type"] == "event" + recieved_rows.extend(msg["event"]) + + # Make sure we get rows back in order + assert recieved_rows == [ + {"domain": "test", "message": "1", "name": "device name", "when": ANY}, + {"domain": "test", "message": "2", "name": "device name", "when": ANY}, + {"domain": "test", "message": "3", "name": "device name", "when": ANY}, + {"domain": "test", "message": "4", "name": "device name", "when": ANY}, + {"domain": "test", "message": "5", "name": "device name", "when": ANY}, + {"domain": "test", "message": "6", "name": "device name", "when": ANY}, + {"domain": "test", "message": "7", "name": "device name", "when": ANY}, + ] + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await websocket_client.receive_json() + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_subscribe_disconnected(hass, recorder_mock, hass_ws_client): + """Test subscribe/unsubscribe logbook stream gets disconnected.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + await async_wait_recording_done(hass) + + init_count = sum(hass.bus.async_listeners().values()) + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + state: State = hass.states.get("binary_sensor.is_light") + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "entity_ids": ["light.small", "binary_sensor.is_light"], + } + ) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + { + "entity_id": "binary_sensor.is_light", + "state": "off", + "when": state.last_updated.timestamp(), + } + ] + + await websocket_client.close() + await hass.async_block_till_done() + + # Check our listener got unsubscribed + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +async def test_stream_consumer_stop_processing(hass, recorder_mock, hass_ws_client): + """Test we unsubscribe if the stream consumer fails or is canceled.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + await async_wait_recording_done(hass) + init_count = sum(hass.bus.async_listeners().values()) + hass.states.async_set("light.small", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + await hass.async_block_till_done() + + await async_wait_recording_done(hass) + websocket_client = await hass_ws_client() + + after_ws_created_count = sum(hass.bus.async_listeners().values()) + + with patch.object(websocket_api, "MAX_PENDING_LOGBOOK_EVENTS", 5), patch.object( + websocket_api, "_async_events_consumer" + ): + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "entity_ids": ["light.small", "binary_sensor.is_light"], + } + ) + await async_wait_recording_done(hass) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + assert sum(hass.bus.async_listeners().values()) != init_count + for _ in range(5): + hass.states.async_set("binary_sensor.is_light", STATE_ON) + hass.states.async_set("binary_sensor.is_light", STATE_OFF) + await async_wait_recording_done(hass) + + # Check our listener got unsubscribed because + # the queue got full and the overload safety tripped + assert sum(hass.bus.async_listeners().values()) == after_ws_created_count + await websocket_client.close() + assert sum(hass.bus.async_listeners().values()) == init_count + + +@patch("homeassistant.components.logbook.websocket_api.EVENT_COALESCE_TIME", 0) +@patch("homeassistant.components.logbook.websocket_api.MAX_RECORDER_WAIT", 0.15) +async def test_recorder_is_far_behind(hass, recorder_mock, hass_ws_client, caplog): + """Test we still start live streaming if the recorder is far behind.""" + now = dt_util.utcnow() + await asyncio.gather( + *[ + async_setup_component(hass, comp, {}) + for comp in ("homeassistant", "logbook", "automation", "script") + ] + ) + await async_wait_recording_done(hass) + device = await _async_mock_device_with_logbook_platform(hass) + await async_wait_recording_done(hass) + + # Block the recorder queue + await async_block_recorder(hass, 0.3) + await hass.async_block_till_done() + + websocket_client = await hass_ws_client() + await websocket_client.send_json( + { + "id": 7, + "type": "logbook/event_stream", + "start_time": now.isoformat(), + "device_ids": [device.id], + } + ) + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + # There are no answers to our initial query + # so we get an empty reply. This is to ensure + # consumers of the api know there are no results + # and its not a failure case. This is useful + # in the frontend so we can tell the user there + # are no results vs waiting for them to appear + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [] + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "1"}) + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + {"domain": "test", "message": "1", "name": "device name", "when": ANY} + ] + + hass.bus.async_fire("mock_event", {"device_id": device.id, "message": "2"}) + await hass.async_block_till_done() + + msg = await websocket_client.receive_json() + assert msg["id"] == 7 + assert msg["type"] == "event" + assert msg["event"] == [ + {"domain": "test", "message": "2", "name": "device name", "when": ANY} + ] + + await websocket_client.send_json( + {"id": 8, "type": "unsubscribe_events", "subscription": 7} + ) + msg = await websocket_client.receive_json() + + assert msg["id"] == 8 + assert msg["type"] == TYPE_RESULT + assert msg["success"] + + assert "Recorder is behind" in caplog.text diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index b36547309cd..39cde4c2e7c 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -1,7 +1,10 @@ """Common test utils for working with recorder.""" from __future__ import annotations +import asyncio +from dataclasses import dataclass from datetime import datetime, timedelta +import time from typing import Any, cast from sqlalchemy import create_engine @@ -10,8 +13,9 @@ from sqlalchemy.orm.session import Session from homeassistant import core as ha from homeassistant.components import recorder from homeassistant.components.recorder import get_instance, statistics +from homeassistant.components.recorder.core import Recorder from homeassistant.components.recorder.models import RecorderRuns -from homeassistant.components.recorder.tasks import StatisticsTask +from homeassistant.components.recorder.tasks import RecorderTask, StatisticsTask from homeassistant.core import HomeAssistant from homeassistant.util import dt as dt_util @@ -21,6 +25,31 @@ from tests.components.recorder import models_schema_0 DEFAULT_PURGE_TASKS = 3 +@dataclass +class BlockRecorderTask(RecorderTask): + """A task to block the recorder for testing only.""" + + event: asyncio.Event + seconds: float + + def run(self, instance: Recorder) -> None: + """Block the recorders event loop.""" + instance.hass.loop.call_soon_threadsafe(self.event.set) + time.sleep(self.seconds) + + +async def async_block_recorder(hass: HomeAssistant, seconds: float) -> None: + """Block the recorders event loop for testing. + + Returns as soon as the recorder has started the block. + + Does not wait for the block to finish. + """ + event = asyncio.Event() + get_instance(hass).queue_task(BlockRecorderTask(event, seconds)) + await event.wait() + + def do_adhoc_statistics(hass: HomeAssistant, **kwargs: Any) -> None: """Trigger an adhoc statistics run.""" if not (start := kwargs.get("start")): diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 49133dd67bd..41c4428ee5e 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -55,6 +55,7 @@ from homeassistant.setup import async_setup_component, setup_component from homeassistant.util import dt as dt_util from .common import ( + async_block_recorder, async_wait_recording_done, corrupt_db_file, run_information_with_session, @@ -1537,3 +1538,25 @@ def test_deduplication_state_attributes_inside_commit_interval(hass_recorder, ca first_attributes_id = states[0].attributes_id last_attributes_id = states[-1].attributes_id assert first_attributes_id == last_attributes_id + + +async def test_async_block_till_done(hass, async_setup_recorder_instance): + """Test we can block until recordering is done.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + entity_id = "test.recorder" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.states.async_set(entity_id, "on", attributes) + hass.states.async_set(entity_id, "off", attributes) + + def _fetch_states(): + with session_scope(hass=hass) as session: + return list(session.query(States).filter(States.entity_id == entity_id)) + + await async_block_recorder(hass, 0.1) + await instance.async_block_till_done() + states = await instance.async_add_executor_job(_fetch_states) + assert len(states) == 2 + await hass.async_block_till_done() diff --git a/tests/test_core.py b/tests/test_core.py index 1ac7002cb7b..ee1005a60b0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1799,3 +1799,33 @@ async def test_state_firing_event_matches_context_id_ulid_time(hass): assert _ulid_timestamp(event.context.id) == int( events[0].time_fired.timestamp() * 1000 ) + + +async def test_event_context(hass): + """Test we can lookup the origin of a context from an event.""" + events = [] + + @ha.callback + def capture_events(event): + nonlocal events + events.append(event) + + cancel = hass.bus.async_listen("dummy_event", capture_events) + cancel2 = hass.bus.async_listen("dummy_event_2", capture_events) + + hass.bus.async_fire("dummy_event") + await hass.async_block_till_done() + + dummy_event: ha.Event = events[0] + + hass.bus.async_fire("dummy_event_2", context=dummy_event.context) + await hass.async_block_till_done() + context_id = dummy_event.context.id + + dummy_event2: ha.Event = events[1] + assert dummy_event2.context == dummy_event.context + assert dummy_event2.context.id == context_id + cancel() + cancel2() + + assert dummy_event2.context.origin_event == dummy_event