diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index fbec19a2d1e..ec5c5c984b5 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -19,7 +19,9 @@ EVENT_RECORDER_HOURLY_STATISTICS_GENERATED = "recorder_hourly_statistics_generat CONF_DB_INTEGRITY_CHECK = "db_integrity_check" -MAX_QUEUE_BACKLOG = 65000 +MAX_QUEUE_BACKLOG_MIN_VALUE = 65000 +ESTIMATED_QUEUE_ITEM_SIZE = 10240 +QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY = 0.65 # The maximum number of rows (events) we purge in one delete statement diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 0858b3f93e2..2491c0a9baa 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -11,9 +11,10 @@ import queue import sqlite3 import threading import time -from typing import Any, TypeVar +from typing import Any, TypeVar, cast import async_timeout +import psutil_home_assistant as ha_psutil from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select from sqlalchemy.engine import Engine from sqlalchemy.engine.interfaces import DBAPIConnection @@ -45,14 +46,16 @@ from .const import ( CONTEXT_ID_AS_BINARY_SCHEMA_VERSION, DB_WORKER_PREFIX, DOMAIN, + ESTIMATED_QUEUE_ITEM_SIZE, EVENT_TYPE_IDS_SCHEMA_VERSION, KEEPALIVE_TIME, LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION, MARIADB_PYMYSQL_URL_PREFIX, MARIADB_URL_PREFIX, - MAX_QUEUE_BACKLOG, + MAX_QUEUE_BACKLOG_MIN_VALUE, MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_URL_PREFIX, + QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY, SQLITE_URL_PREFIX, STATES_META_SCHEMA_VERSION, STATISTICS_ROWS_SCHEMA_VERSION, @@ -148,7 +151,7 @@ WAIT_TASK = WaitTask() ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask() DB_LOCK_TIMEOUT = 30 -DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 +DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds INVALIDATED_ERR = "Database connection invalidated" @@ -201,6 +204,8 @@ class Recorder(threading.Thread): self.async_recorder_ready = asyncio.Event() self._queue_watch = threading.Event() self.engine: Engine | None = None + self.max_backlog: int = MAX_QUEUE_BACKLOG_MIN_VALUE + self._psutil: ha_psutil.PsutilWrapper | None = None # The entity_filter is exposed on the recorder instance so that # it can be used to see if an entity is being recorded and is called @@ -343,7 +348,7 @@ class Recorder(threading.Thread): """ size = self.backlog _LOGGER.debug("Recorder queue size is: %s", size) - if size <= MAX_QUEUE_BACKLOG: + if not self._reached_max_backlog_percentage(100): return _LOGGER.error( ( @@ -352,10 +357,33 @@ class Recorder(threading.Thread): "is corrupt due to a disk problem; The recorder will stop " "recording events to avoid running out of memory" ), - MAX_QUEUE_BACKLOG, + self.backlog, ) self._async_stop_queue_watcher_and_event_listener() + def _available_memory(self) -> int: + """Return the available memory in bytes.""" + if not self._psutil: + self._psutil = ha_psutil.PsutilWrapper() + return cast(int, self._psutil.psutil.virtual_memory().available) + + def _reached_max_backlog_percentage(self, percentage: int) -> bool: + """Check if the system has reached the max queue backlog and return the maximum if it has.""" + percentage_modifier = percentage / 100 + current_backlog = self.backlog + # First check the minimum value since its cheap + if current_backlog < (MAX_QUEUE_BACKLOG_MIN_VALUE * percentage_modifier): + return False + # If they have more RAM available, keep filling the backlog + # since we do not want to stop recording events or give the + # user a bad backup when they have plenty of RAM available. + max_queue_backlog = int( + QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY + * (self._available_memory() / ESTIMATED_QUEUE_ITEM_SIZE) + ) + self.max_backlog = max(max_queue_backlog, MAX_QUEUE_BACKLOG_MIN_VALUE) + return current_backlog >= (max_queue_backlog * percentage_modifier) + @callback def _async_stop_queue_watcher_and_event_listener(self) -> None: """Stop watching the queue and listening for events.""" @@ -705,8 +733,8 @@ class Recorder(threading.Thread): self.schema_version = SCHEMA_VERSION if not self._event_listener: # If the schema migration takes so long that the end - # queue watcher safety kicks in because MAX_QUEUE_BACKLOG - # is reached, we need to reinitialize the listener. + # queue watcher safety kicks in because _reached_max_backlog + # was True, we need to reinitialize the listener. self.hass.add_job(self.async_initialize) else: persistent_notification.create( @@ -946,12 +974,14 @@ class Recorder(threading.Thread): # Notify that lock is being held, wait until database can be used again. self.hass.add_job(_async_set_database_locked, task) while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): - if self.backlog > MAX_QUEUE_BACKLOG * 0.9: + if self._reached_max_backlog_percentage(90): _LOGGER.warning( - "Database queue backlog reached more than 90% of maximum queue " + "Database queue backlog reached more than %s (%s events) of maximum queue " "length while waiting for backup to finish; recorder will now " "resume writing to database. The backup cannot be trusted and " - "must be restarted" + "must be restarted", + "90%", + self.backlog, ) task.queue_overflow = True break diff --git a/homeassistant/components/recorder/manifest.json b/homeassistant/components/recorder/manifest.json index a0caa9d3c6e..289299ee11f 100644 --- a/homeassistant/components/recorder/manifest.json +++ b/homeassistant/components/recorder/manifest.json @@ -6,5 +6,9 @@ "integration_type": "system", "iot_class": "local_push", "quality_scale": "internal", - "requirements": ["sqlalchemy==2.0.9", "fnv-hash-fast==0.3.1"] + "requirements": [ + "sqlalchemy==2.0.9", + "fnv-hash-fast==0.3.1", + "psutil-home-assistant==0.0.1" + ] } diff --git a/homeassistant/components/recorder/websocket_api.py b/homeassistant/components/recorder/websocket_api.py index df42c519fe2..c52df1b25e3 100644 --- a/homeassistant/components/recorder/websocket_api.py +++ b/homeassistant/components/recorder/websocket_api.py @@ -30,7 +30,6 @@ from homeassistant.util.unit_conversion import ( VolumeConverter, ) -from .const import MAX_QUEUE_BACKLOG from .models import StatisticPeriod from .statistics import ( STATISTIC_UNIT_TO_UNIT_CONVERTER, @@ -504,7 +503,7 @@ def ws_info( recorder_info = { "backlog": backlog, - "max_backlog": MAX_QUEUE_BACKLOG, + "max_backlog": instance.max_backlog, "migration_in_progress": migration_in_progress, "migration_is_live": migration_is_live, "recording": recording, diff --git a/requirements_all.txt b/requirements_all.txt index e352313537e..a15eb60b87a 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1401,6 +1401,7 @@ prometheus_client==0.7.1 proxmoxer==2.0.1 # homeassistant.components.hardware +# homeassistant.components.recorder psutil-home-assistant==0.0.1 # homeassistant.components.systemmonitor diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 3c98ddf35cd..0cbf33a0fe4 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -1034,6 +1034,7 @@ progettihwsw==0.1.1 prometheus_client==0.7.1 # homeassistant.components.hardware +# homeassistant.components.recorder psutil-home-assistant==0.0.1 # homeassistant.components.androidtv diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 476d7dff41a..84dbb9a2816 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -450,7 +450,7 @@ async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions( await async_wait_recording_done(hass) - with patch.object(instance, "db_retry_wait", 0.05), patch.object( + with patch.object(instance, "db_retry_wait", 0.01), patch.object( instance.event_session, "flush", side_effect=OperationalError( @@ -1757,7 +1757,7 @@ async def test_database_lock_and_unlock( # Recording can't be finished while lock is held with pytest.raises(asyncio.TimeoutError): - await asyncio.wait_for(asyncio.shield(task), timeout=1) + await asyncio.wait_for(asyncio.shield(task), timeout=0.25) db_events = await hass.async_add_executor_job(_get_db_events) assert len(db_events) == 0 @@ -1773,11 +1773,12 @@ async def test_database_lock_and_overflow( hass: HomeAssistant, recorder_db_url: str, tmp_path: Path, + caplog: pytest.LogCaptureFixture, ) -> None: """Test writing events during lock leading to overflow the queue causes the database to unlock.""" if recorder_db_url.startswith(("mysql://", "postgresql://")): # Database locking is only used for SQLite - return + return pytest.skip("Database locking is only used for SQLite") # Use file DB, in memory DB cannot do write locks. if recorder_db_url == "sqlite://": @@ -1787,10 +1788,6 @@ async def test_database_lock_and_overflow( recorder.CONF_COMMIT_INTERVAL: 0, recorder.CONF_DB_URL: recorder_db_url, } - await async_setup_recorder_instance(hass, config) - await hass.async_block_till_done() - event_type = "EVENT_TEST" - event_types = (event_type,) def _get_db_events(): with session_scope(hass=hass, read_only=True) as session: @@ -1800,26 +1797,110 @@ async def test_database_lock_and_overflow( ) ) - instance = get_instance(hass) + with patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object( + recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01 + ), patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0): + await async_setup_recorder_instance(hass, config) + await hass.async_block_till_done() + event_type = "EVENT_TEST" + event_types = (event_type,) + + instance = get_instance(hass) - with patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1), patch.object( - recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1 - ): await instance.lock_database() event_data = {"test_attr": 5, "test_attr_10": "nice"} - hass.bus.fire(event_type, event_data) + hass.bus.async_fire(event_type, event_data) # Check that this causes the queue to overflow and write succeeds # even before unlocking. await async_wait_recording_done(hass) - db_events = await hass.async_add_executor_job(_get_db_events) + db_events = await instance.async_add_executor_job(_get_db_events) assert len(db_events) == 1 + assert "Database queue backlog reached more than" in caplog.text assert not instance.unlock_database() +async def test_database_lock_and_overflow_checks_available_memory( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + recorder_db_url: str, + tmp_path: Path, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test writing events during lock leading to overflow the queue causes the database to unlock.""" + if recorder_db_url.startswith(("mysql://", "postgresql://")): + return pytest.skip("Database locking is only used for SQLite") + + # Use file DB, in memory DB cannot do write locks. + if recorder_db_url == "sqlite://": + # Use file DB, in memory DB cannot do write locks. + recorder_db_url = "sqlite:///" + str(tmp_path / "pytest.db") + config = { + recorder.CONF_COMMIT_INTERVAL: 0, + recorder.CONF_DB_URL: recorder_db_url, + } + + def _get_db_events(): + with session_scope(hass=hass, read_only=True) as session: + return list( + session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(event_types)) + ) + ) + + await async_setup_recorder_instance(hass, config) + await hass.async_block_till_done() + event_type = "EVENT_TEST" + event_types = (event_type,) + await async_wait_recording_done(hass) + + with patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object( + recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 1 + ), patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01), patch.object( + recorder.core.Recorder, + "_available_memory", + return_value=recorder.core.ESTIMATED_QUEUE_ITEM_SIZE * 4, + ): + instance = get_instance(hass) + + await instance.lock_database() + + # Record up to the extended limit (which takes into account the available memory) + for _ in range(2): + event_data = {"test_attr": 5, "test_attr_10": "nice"} + hass.bus.async_fire(event_type, event_data) + + def _wait_database_unlocked(): + return instance._database_lock_task.database_unlock.wait(0.2) + + databack_unlocked = await hass.async_add_executor_job(_wait_database_unlocked) + assert not databack_unlocked + + db_events = await instance.async_add_executor_job(_get_db_events) + assert len(db_events) == 0 + + assert "Database queue backlog reached more than" not in caplog.text + + # Record beyond the extended limit (which takes into account the available memory) + for _ in range(20): + event_data = {"test_attr": 5, "test_attr_10": "nice"} + hass.bus.async_fire(event_type, event_data) + + # Check that this causes the queue to overflow and write succeeds + # even before unlocking. + await async_wait_recording_done(hass) + + assert not instance.unlock_database() + + assert "Database queue backlog reached more than" in caplog.text + + db_events = await instance.async_add_executor_job(_get_db_events) + assert len(db_events) >= 2 + + async def test_database_lock_timeout( recorder_mock: Recorder, hass: HomeAssistant, recorder_db_url: str ) -> None: diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index da56d5fd04f..ede5bc32a6f 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -259,7 +259,9 @@ async def test_events_during_migration_queue_exhausted( with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( "homeassistant.components.recorder.core.create_engine", new=create_engine_test, - ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1): + ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object( + recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0 + ): recorder_helper.async_initialize_recorder(hass) await async_setup_component( hass, diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 8e760b40100..335bdafd643 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -2203,7 +2203,9 @@ async def test_recorder_info_migration_queue_exhausted( "homeassistant.components.recorder.core.create_engine", new=create_engine_test, ), patch.object( - recorder.core, "MAX_QUEUE_BACKLOG", 1 + recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1 + ), patch.object( + recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0 ), patch( "homeassistant.components.recorder.migration._apply_update", wraps=stalled_migration,