diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 21acc183e50..103435fa519 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -52,7 +52,13 @@ from homeassistant.loader import bind_hass import homeassistant.util.dt as dt_util from . import history, migration, purge, statistics, websocket_api -from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX +from .const import ( + CONF_DB_INTEGRITY_CHECK, + DATA_INSTANCE, + DOMAIN, + MAX_QUEUE_BACKLOG, + SQLITE_URL_PREFIX, +) from .models import ( Base, Events, @@ -83,8 +89,6 @@ ATTR_KEEP_DAYS = "keep_days" ATTR_REPACK = "repack" ATTR_APPLY_FILTER = "apply_filter" -MAX_QUEUE_BACKLOG = 30000 - SERVICE_PURGE_SCHEMA = vol.Schema( { vol.Optional(ATTR_KEEP_DAYS): cv.positive_int, @@ -1089,3 +1093,8 @@ class Recorder(threading.Thread): self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) self._end_session() self._close_connection() + + @property + def recording(self): + """Return if the recorder is recording.""" + return self._event_listener is not None diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index eab3c30e99e..a04218264ee 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -6,6 +6,8 @@ DOMAIN = "recorder" CONF_DB_INTEGRITY_CHECK = "db_integrity_check" +MAX_QUEUE_BACKLOG = 30000 + # The maximum number of rows (events) we purge in one delete statement # sqlite3 has a limit of 999 until version 3.32.0 diff --git a/homeassistant/components/recorder/websocket_api.py b/homeassistant/components/recorder/websocket_api.py index ba77692fe8e..b06d2d07f1e 100644 --- a/homeassistant/components/recorder/websocket_api.py +++ b/homeassistant/components/recorder/websocket_api.py @@ -1,14 +1,19 @@ """The Energy websocket API.""" from __future__ import annotations +from typing import TYPE_CHECKING + import voluptuous as vol from homeassistant.components import websocket_api from homeassistant.core import HomeAssistant, callback -from .const import DATA_INSTANCE +from .const import DATA_INSTANCE, MAX_QUEUE_BACKLOG from .statistics import validate_statistics +if TYPE_CHECKING: + from . import Recorder + @callback def async_setup(hass: HomeAssistant) -> None: @@ -16,6 +21,7 @@ def async_setup(hass: HomeAssistant) -> None: websocket_api.async_register_command(hass, ws_validate_statistics) websocket_api.async_register_command(hass, ws_clear_statistics) websocket_api.async_register_command(hass, ws_update_statistics_metadata) + websocket_api.async_register_command(hass, ws_info) @websocket_api.websocket_command( @@ -72,3 +78,30 @@ def ws_update_statistics_metadata( msg["statistic_id"], msg["unit_of_measurement"] ) connection.send_result(msg["id"]) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "recorder/info", + } +) +@callback +def ws_info( + hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict +) -> None: + """Return status of the recorder.""" + instance: Recorder = hass.data[DATA_INSTANCE] + + backlog = instance.queue.qsize() if instance and instance.queue else None + migration_in_progress = instance.migration_in_progress if instance else False + recording = instance.recording if instance else False + thread_alive = instance.is_alive() if instance else False + + recorder_info = { + "backlog": backlog, + "max_backlog": MAX_QUEUE_BACKLOG, + "migration_in_progress": migration_in_progress, + "recording": recording, + "thread_running": thread_alive, + } + connection.send_result(msg["id"], recorder_info) diff --git a/tests/components/recorder/common.py b/tests/components/recorder/common.py index 7414548c864..b21d671560c 100644 --- a/tests/components/recorder/common.py +++ b/tests/components/recorder/common.py @@ -1,12 +1,15 @@ """Common test utils for working with recorder.""" from datetime import timedelta +from sqlalchemy import create_engine + from homeassistant import core as ha from homeassistant.components import recorder from homeassistant.core import HomeAssistant from homeassistant.util import dt as dt_util from tests.common import async_fire_time_changed, fire_time_changed +from tests.components.recorder import models_original DEFAULT_PURGE_TASKS = 3 @@ -80,3 +83,13 @@ def corrupt_db_file(test_db_file): with open(test_db_file, "w+") as fhandle: fhandle.seek(200) fhandle.write("I am a corrupt db" * 100) + + +def create_engine_test(*args, **kwargs): + """Test version of create_engine that initializes with old schema. + + This simulates an existing db with the old schema. + """ + engine = create_engine(*args, **kwargs) + models_original.Base.metadata.create_all(engine) + return engine diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 5586e06d337..562935e78a1 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -23,10 +23,9 @@ from homeassistant.components.recorder.models import States from homeassistant.components.recorder.util import session_scope import homeassistant.util.dt as dt_util -from .common import async_wait_recording_done_without_instance +from .common import async_wait_recording_done_without_instance, create_engine_test from tests.common import async_fire_time_changed -from tests.components.recorder import models_original def _get_native_states(hass, entity_id): @@ -37,16 +36,6 @@ def _get_native_states(hass, entity_id): ] -def create_engine_test(*args, **kwargs): - """Test version of create_engine that initializes with old schema. - - This simulates an existing db with the old schema. - """ - engine = create_engine(*args, **kwargs) - models_original.Base.metadata.create_all(engine) - return engine - - async def test_schema_update_calls(hass): """Test that schema migrations occur in correct order.""" assert await recorder.async_migration_in_progress(hass) is False diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index d52393fb693..52a9424b4ac 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -1,18 +1,29 @@ """The tests for sensor recorder platform.""" # pylint: disable=protected-access,invalid-name from datetime import timedelta +import threading +from unittest.mock import patch import pytest from pytest import approx +from homeassistant.components import recorder from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util from homeassistant.util.unit_system import METRIC_SYSTEM -from .common import trigger_db_commit +from .common import ( + async_wait_recording_done_without_instance, + create_engine_test, + trigger_db_commit, +) -from tests.common import init_recorder_component +from tests.common import ( + async_fire_time_changed, + async_init_recorder_component, + init_recorder_component, +) POWER_SENSOR_ATTRIBUTES = { "device_class": "power", @@ -237,3 +248,113 @@ async def test_update_statistics_metadata(hass, hass_ws_client, new_unit): "unit_of_measurement": new_unit, } ] + + +async def test_recorder_info(hass, hass_ws_client): + """Test getting recorder status.""" + client = await hass_ws_client() + await async_init_recorder_component(hass) + + # Ensure there are no queued events + await async_wait_recording_done_without_instance(hass) + + await client.send_json({"id": 1, "type": "recorder/info"}) + response = await client.receive_json() + assert response["success"] + assert response["result"] == { + "backlog": 0, + "max_backlog": 30000, + "migration_in_progress": False, + "recording": True, + "thread_running": True, + } + + +async def test_recorder_info_no_recorder(hass, hass_ws_client): + """Test getting recorder status when recorder is not present.""" + client = await hass_ws_client() + + await client.send_json({"id": 1, "type": "recorder/info"}) + response = await client.receive_json() + assert not response["success"] + assert response["error"]["code"] == "unknown_command" + + +async def test_recorder_info_bad_recorder_config(hass, hass_ws_client): + """Test getting recorder status when recorder is not started.""" + config = {recorder.CONF_DB_URL: "sqlite://no_file", recorder.CONF_DB_RETRY_WAIT: 0} + + client = await hass_ws_client() + + with patch("homeassistant.components.recorder.migration.migrate_schema"): + assert not await async_setup_component( + hass, recorder.DOMAIN, {recorder.DOMAIN: config} + ) + assert recorder.DOMAIN not in hass.config.components + await hass.async_block_till_done() + + # Wait for recorder to shut down + await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join) + + await client.send_json({"id": 1, "type": "recorder/info"}) + response = await client.receive_json() + assert response["success"] + assert response["result"]["recording"] is False + assert response["result"]["thread_running"] is False + + +async def test_recorder_info_migration_queue_exhausted(hass, hass_ws_client): + """Test getting recorder status when recorder queue is exhausted.""" + assert await recorder.async_migration_in_progress(hass) is False + + migration_done = threading.Event() + + real_migration = recorder.migration.migrate_schema + + def stalled_migration(*args): + """Make migration stall.""" + nonlocal migration_done + migration_done.wait() + return real_migration(*args) + + with patch( + "homeassistant.components.recorder.Recorder.async_periodic_statistics" + ), patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ), patch.object( + recorder, "MAX_QUEUE_BACKLOG", 1 + ), patch( + "homeassistant.components.recorder.migration.migrate_schema", + wraps=stalled_migration, + ): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + await hass.async_block_till_done() + + # Detect queue full + async_fire_time_changed(hass, dt_util.utcnow() + timedelta(hours=2)) + await hass.async_block_till_done() + + client = await hass_ws_client() + + # Check the status + await client.send_json({"id": 1, "type": "recorder/info"}) + response = await client.receive_json() + assert response["success"] + assert response["result"]["migration_in_progress"] is True + assert response["result"]["recording"] is False + assert response["result"]["thread_running"] is True + + # Let migration finish + migration_done.set() + await async_wait_recording_done_without_instance(hass) + + # Check the status after migration finished + await client.send_json({"id": 2, "type": "recorder/info"}) + response = await client.receive_json() + assert response["success"] + assert response["result"]["migration_in_progress"] is False + assert response["result"]["recording"] is True + assert response["result"]["thread_running"] is True