From 43596f22a41f6e0bb9f1f8ca1aa5d247833ce984 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 11 Jul 2024 02:09:35 -0500 Subject: [PATCH] Fix recorder max backlog calculation being too small (#121739) --- homeassistant/components/recorder/const.py | 3 +- homeassistant/components/recorder/core.py | 33 +++++++------------ tests/components/recorder/test_init.py | 33 +++++++++++++++---- tests/components/recorder/test_migrate.py | 4 ++- .../components/recorder/test_websocket_api.py | 5 ++- 5 files changed, 47 insertions(+), 31 deletions(-) diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index f2af5306ded..00121608b4c 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -32,8 +32,7 @@ DOMAIN = "recorder" CONF_DB_INTEGRITY_CHECK = "db_integrity_check" MAX_QUEUE_BACKLOG_MIN_VALUE = 65000 -ESTIMATED_QUEUE_ITEM_SIZE = 10240 -QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY = 0.65 +MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG = 256 * 1024**2 # The maximum number of rows (events) we purge in one delete statement diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 28291de3be8..db9f4239480 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -53,16 +53,15 @@ from . import migration, statistics from .const import ( DB_WORKER_PREFIX, DOMAIN, - ESTIMATED_QUEUE_ITEM_SIZE, KEEPALIVE_TIME, LAST_REPORTED_SCHEMA_VERSION, LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION, MARIADB_PYMYSQL_URL_PREFIX, MARIADB_URL_PREFIX, MAX_QUEUE_BACKLOG_MIN_VALUE, + MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG, MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_URL_PREFIX, - QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY, SQLITE_MAX_BIND_VARS, SQLITE_URL_PREFIX, STATISTICS_ROWS_SCHEMA_VERSION, @@ -156,6 +155,7 @@ ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask() DB_LOCK_TIMEOUT = 30 DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds +QUEUE_CHECK_INTERVAL = timedelta(minutes=5) INVALIDATED_ERR = "Database connection invalidated" CONNECTIVITY_ERR = "Error in database connectivity during commit" @@ -347,7 +347,7 @@ class Recorder(threading.Thread): self._queue_watcher = async_track_time_interval( self.hass, self._async_check_queue, - timedelta(minutes=10), + QUEUE_CHECK_INTERVAL, name="Recorder queue watcher", ) @@ -387,9 +387,8 @@ class Recorder(threading.Thread): The queue grows during migration or if something really goes wrong. """ - size = self.backlog - _LOGGER.debug("Recorder queue size is: %s", size) - if not self._reached_max_backlog_percentage(100): + _LOGGER.debug("Recorder queue size is: %s", self.backlog) + if not self._reached_max_backlog(): return _LOGGER.error( ( @@ -408,22 +407,15 @@ class Recorder(threading.Thread): self._psutil = ha_psutil.PsutilWrapper() return cast(int, self._psutil.psutil.virtual_memory().available) - def _reached_max_backlog_percentage(self, percentage: int) -> bool: - """Check if the system has reached the max queue backlog and return the maximum if it has.""" - percentage_modifier = percentage / 100 - current_backlog = self.backlog + def _reached_max_backlog(self) -> bool: + """Check if the system has reached the max queue backlog and return True if it has.""" # First check the minimum value since its cheap - if current_backlog < (MAX_QUEUE_BACKLOG_MIN_VALUE * percentage_modifier): + if self.backlog < MAX_QUEUE_BACKLOG_MIN_VALUE: return False # If they have more RAM available, keep filling the backlog # since we do not want to stop recording events or give the # user a bad backup when they have plenty of RAM available. - max_queue_backlog = int( - QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY - * (self._available_memory() / ESTIMATED_QUEUE_ITEM_SIZE) - ) - self.max_backlog = max(max_queue_backlog, MAX_QUEUE_BACKLOG_MIN_VALUE) - return current_backlog >= (max_queue_backlog * percentage_modifier) + return self._available_memory() < MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG @callback def _async_stop_queue_watcher_and_event_listener(self) -> None: @@ -1019,13 +1011,12 @@ class Recorder(threading.Thread): # Notify that lock is being held, wait until database can be used again. hass.add_job(_async_set_database_locked, task) while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): - if self._reached_max_backlog_percentage(90): + if self._reached_max_backlog(): _LOGGER.warning( - "Database queue backlog reached more than %s (%s events) of maximum queue " - "length while waiting for backup to finish; recorder will now " + "Database queue backlog reached more than %s events " + "while waiting for backup to finish; recorder will now " "resume writing to database. The backup cannot be trusted and " "must be restarted", - "90%", self.backlog, ) task.queue_overflow = True diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 8e28e15fdf7..cc83bad5500 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -6,6 +6,7 @@ import asyncio from collections.abc import Generator from datetime import datetime, timedelta import sqlite3 +import sys import threading from typing import Any, cast from unittest.mock import MagicMock, Mock, patch @@ -1883,7 +1884,9 @@ async def test_database_lock_and_overflow( with ( patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01), - patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), + patch.object( + recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize + ), ): await async_setup_recorder_instance(hass, config) await hass.async_block_till_done() @@ -1943,26 +1946,43 @@ async def test_database_lock_and_overflow_checks_available_memory( ) ) - await async_setup_recorder_instance(hass, config) - await hass.async_block_till_done() + with patch( + "homeassistant.components.recorder.core.QUEUE_CHECK_INTERVAL", + timedelta(seconds=1), + ): + await async_setup_recorder_instance(hass, config) + await hass.async_block_till_done() event_type = "EVENT_TEST" event_types = (event_type,) await async_wait_recording_done(hass) + min_available_memory = 256 * 1024**2 + + out_of_ram = False + + def _get_available_memory(*args: Any, **kwargs: Any) -> int: + nonlocal out_of_ram + return min_available_memory / 2 if out_of_ram else min_available_memory with ( patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), - patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 1), + patch.object( + recorder.core, + "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", + min_available_memory, + ), patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01), patch.object( recorder.core.Recorder, "_available_memory", - return_value=recorder.core.ESTIMATED_QUEUE_ITEM_SIZE * 4, + side_effect=_get_available_memory, ), ): instance = get_instance(hass) - await instance.lock_database() + assert await instance.lock_database() + db_events = await instance.async_add_executor_job(_get_db_events) + assert len(db_events) == 0 # Record up to the extended limit (which takes into account the available memory) for _ in range(2): event_data = {"test_attr": 5, "test_attr_10": "nice"} @@ -1979,6 +1999,7 @@ async def test_database_lock_and_overflow_checks_available_memory( assert "Database queue backlog reached more than" not in caplog.text + out_of_ram = True # Record beyond the extended limit (which takes into account the available memory) for _ in range(20): event_data = {"test_attr": 5, "test_attr_10": "nice"} diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 25fe8993cfb..3940721b99b 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -292,7 +292,9 @@ async def test_events_during_migration_queue_exhausted( new=create_engine_test, ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), - patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), + patch.object( + recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize + ), ): await async_setup_recorder_instance( hass, {"commit_interval": 0}, wait_recorder=False diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 508848b9cc7..7467ebe5c4c 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -3,6 +3,7 @@ import datetime from datetime import timedelta from statistics import fmean +import sys from unittest.mock import ANY, patch from freezegun import freeze_time @@ -2515,7 +2516,9 @@ async def test_recorder_info_migration_queue_exhausted( new=create_engine_test, ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), - patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), + patch.object( + recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize + ), ): async with async_test_recorder(hass, wait_recorder=False): await hass.async_add_executor_job(