Add recorder status WS API (#58989)

* Add recorder status WS API

* Rename recorder/status to recorder/info

* Silence pylint

* Improve tests

* Address review comments

* Tweak

* Try to fix tests

* Try to debug flaky tests

* Try to fix tests

* Revert changes to async_migration_in_progress

* Try to fix tests

* Remove debug prints

* Apply suggestions from code review
This commit is contained in:
Erik Montnemery 2021-11-04 16:46:45 +01:00 committed by GitHub
parent be4e9f91b6
commit 4c5aca93df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 185 additions and 18 deletions

View File

@ -52,7 +52,13 @@ from homeassistant.loader import bind_hass
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from . import history, migration, purge, statistics, websocket_api from . import history, migration, purge, statistics, websocket_api
from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX from .const import (
CONF_DB_INTEGRITY_CHECK,
DATA_INSTANCE,
DOMAIN,
MAX_QUEUE_BACKLOG,
SQLITE_URL_PREFIX,
)
from .models import ( from .models import (
Base, Base,
Events, Events,
@ -83,8 +89,6 @@ ATTR_KEEP_DAYS = "keep_days"
ATTR_REPACK = "repack" ATTR_REPACK = "repack"
ATTR_APPLY_FILTER = "apply_filter" ATTR_APPLY_FILTER = "apply_filter"
MAX_QUEUE_BACKLOG = 30000
SERVICE_PURGE_SCHEMA = vol.Schema( SERVICE_PURGE_SCHEMA = vol.Schema(
{ {
vol.Optional(ATTR_KEEP_DAYS): cv.positive_int, vol.Optional(ATTR_KEEP_DAYS): cv.positive_int,
@ -1089,3 +1093,8 @@ class Recorder(threading.Thread):
self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) self.hass.add_job(self._async_stop_queue_watcher_and_event_listener)
self._end_session() self._end_session()
self._close_connection() self._close_connection()
@property
def recording(self):
"""Return if the recorder is recording."""
return self._event_listener is not None

View File

@ -6,6 +6,8 @@ DOMAIN = "recorder"
CONF_DB_INTEGRITY_CHECK = "db_integrity_check" CONF_DB_INTEGRITY_CHECK = "db_integrity_check"
MAX_QUEUE_BACKLOG = 30000
# The maximum number of rows (events) we purge in one delete statement # The maximum number of rows (events) we purge in one delete statement
# sqlite3 has a limit of 999 until version 3.32.0 # sqlite3 has a limit of 999 until version 3.32.0

View File

@ -1,14 +1,19 @@
"""The Energy websocket API.""" """The Energy websocket API."""
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING
import voluptuous as vol import voluptuous as vol
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from .const import DATA_INSTANCE from .const import DATA_INSTANCE, MAX_QUEUE_BACKLOG
from .statistics import validate_statistics from .statistics import validate_statistics
if TYPE_CHECKING:
from . import Recorder
@callback @callback
def async_setup(hass: HomeAssistant) -> None: def async_setup(hass: HomeAssistant) -> None:
@ -16,6 +21,7 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_validate_statistics) websocket_api.async_register_command(hass, ws_validate_statistics)
websocket_api.async_register_command(hass, ws_clear_statistics) websocket_api.async_register_command(hass, ws_clear_statistics)
websocket_api.async_register_command(hass, ws_update_statistics_metadata) websocket_api.async_register_command(hass, ws_update_statistics_metadata)
websocket_api.async_register_command(hass, ws_info)
@websocket_api.websocket_command( @websocket_api.websocket_command(
@ -72,3 +78,30 @@ def ws_update_statistics_metadata(
msg["statistic_id"], msg["unit_of_measurement"] msg["statistic_id"], msg["unit_of_measurement"]
) )
connection.send_result(msg["id"]) connection.send_result(msg["id"])
@websocket_api.websocket_command(
{
vol.Required("type"): "recorder/info",
}
)
@callback
def ws_info(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""Return status of the recorder."""
instance: Recorder = hass.data[DATA_INSTANCE]
backlog = instance.queue.qsize() if instance and instance.queue else None
migration_in_progress = instance.migration_in_progress if instance else False
recording = instance.recording if instance else False
thread_alive = instance.is_alive() if instance else False
recorder_info = {
"backlog": backlog,
"max_backlog": MAX_QUEUE_BACKLOG,
"migration_in_progress": migration_in_progress,
"recording": recording,
"thread_running": thread_alive,
}
connection.send_result(msg["id"], recorder_info)

View File

@ -1,12 +1,15 @@
"""Common test utils for working with recorder.""" """Common test utils for working with recorder."""
from datetime import timedelta from datetime import timedelta
from sqlalchemy import create_engine
from homeassistant import core as ha from homeassistant import core as ha
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from tests.common import async_fire_time_changed, fire_time_changed from tests.common import async_fire_time_changed, fire_time_changed
from tests.components.recorder import models_original
DEFAULT_PURGE_TASKS = 3 DEFAULT_PURGE_TASKS = 3
@ -80,3 +83,13 @@ def corrupt_db_file(test_db_file):
with open(test_db_file, "w+") as fhandle: with open(test_db_file, "w+") as fhandle:
fhandle.seek(200) fhandle.seek(200)
fhandle.write("I am a corrupt db" * 100) fhandle.write("I am a corrupt db" * 100)
def create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
engine = create_engine(*args, **kwargs)
models_original.Base.metadata.create_all(engine)
return engine

View File

@ -23,10 +23,9 @@ from homeassistant.components.recorder.models import States
from homeassistant.components.recorder.util import session_scope from homeassistant.components.recorder.util import session_scope
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .common import async_wait_recording_done_without_instance from .common import async_wait_recording_done_without_instance, create_engine_test
from tests.common import async_fire_time_changed from tests.common import async_fire_time_changed
from tests.components.recorder import models_original
def _get_native_states(hass, entity_id): def _get_native_states(hass, entity_id):
@ -37,16 +36,6 @@ def _get_native_states(hass, entity_id):
] ]
def create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
engine = create_engine(*args, **kwargs)
models_original.Base.metadata.create_all(engine)
return engine
async def test_schema_update_calls(hass): async def test_schema_update_calls(hass):
"""Test that schema migrations occur in correct order.""" """Test that schema migrations occur in correct order."""
assert await recorder.async_migration_in_progress(hass) is False assert await recorder.async_migration_in_progress(hass) is False

View File

@ -1,18 +1,29 @@
"""The tests for sensor recorder platform.""" """The tests for sensor recorder platform."""
# pylint: disable=protected-access,invalid-name # pylint: disable=protected-access,invalid-name
from datetime import timedelta from datetime import timedelta
import threading
from unittest.mock import patch
import pytest import pytest
from pytest import approx from pytest import approx
from homeassistant.components import recorder
from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import METRIC_SYSTEM from homeassistant.util.unit_system import METRIC_SYSTEM
from .common import trigger_db_commit from .common import (
async_wait_recording_done_without_instance,
create_engine_test,
trigger_db_commit,
)
from tests.common import init_recorder_component from tests.common import (
async_fire_time_changed,
async_init_recorder_component,
init_recorder_component,
)
POWER_SENSOR_ATTRIBUTES = { POWER_SENSOR_ATTRIBUTES = {
"device_class": "power", "device_class": "power",
@ -237,3 +248,113 @@ async def test_update_statistics_metadata(hass, hass_ws_client, new_unit):
"unit_of_measurement": new_unit, "unit_of_measurement": new_unit,
} }
] ]
async def test_recorder_info(hass, hass_ws_client):
"""Test getting recorder status."""
client = await hass_ws_client()
await async_init_recorder_component(hass)
# Ensure there are no queued events
await async_wait_recording_done_without_instance(hass)
await client.send_json({"id": 1, "type": "recorder/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"backlog": 0,
"max_backlog": 30000,
"migration_in_progress": False,
"recording": True,
"thread_running": True,
}
async def test_recorder_info_no_recorder(hass, hass_ws_client):
"""Test getting recorder status when recorder is not present."""
client = await hass_ws_client()
await client.send_json({"id": 1, "type": "recorder/info"})
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "unknown_command"
async def test_recorder_info_bad_recorder_config(hass, hass_ws_client):
"""Test getting recorder status when recorder is not started."""
config = {recorder.CONF_DB_URL: "sqlite://no_file", recorder.CONF_DB_RETRY_WAIT: 0}
client = await hass_ws_client()
with patch("homeassistant.components.recorder.migration.migrate_schema"):
assert not await async_setup_component(
hass, recorder.DOMAIN, {recorder.DOMAIN: config}
)
assert recorder.DOMAIN not in hass.config.components
await hass.async_block_till_done()
# Wait for recorder to shut down
await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join)
await client.send_json({"id": 1, "type": "recorder/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["recording"] is False
assert response["result"]["thread_running"] is False
async def test_recorder_info_migration_queue_exhausted(hass, hass_ws_client):
"""Test getting recorder status when recorder queue is exhausted."""
assert await recorder.async_migration_in_progress(hass) is False
migration_done = threading.Event()
real_migration = recorder.migration.migrate_schema
def stalled_migration(*args):
"""Make migration stall."""
nonlocal migration_done
migration_done.wait()
return real_migration(*args)
with patch(
"homeassistant.components.recorder.Recorder.async_periodic_statistics"
), patch(
"homeassistant.components.recorder.create_engine", new=create_engine_test
), patch.object(
recorder, "MAX_QUEUE_BACKLOG", 1
), patch(
"homeassistant.components.recorder.migration.migrate_schema",
wraps=stalled_migration,
):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
)
hass.states.async_set("my.entity", "on", {})
await hass.async_block_till_done()
# Detect queue full
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(hours=2))
await hass.async_block_till_done()
client = await hass_ws_client()
# Check the status
await client.send_json({"id": 1, "type": "recorder/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["migration_in_progress"] is True
assert response["result"]["recording"] is False
assert response["result"]["thread_running"] is True
# Let migration finish
migration_done.set()
await async_wait_recording_done_without_instance(hass)
# Check the status after migration finished
await client.send_json({"id": 2, "type": "recorder/info"})
response = await client.receive_json()
assert response["success"]
assert response["result"]["migration_in_progress"] is False
assert response["result"]["recording"] is True
assert response["result"]["thread_running"] is True