Fix recorder max backlog calculation being too small (#121739)

This commit is contained in:
J. Nick Koston 2024-07-11 02:09:35 -05:00 committed by GitHub
parent 67fee5be31
commit 43596f22a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 47 additions and 31 deletions

View File

@ -32,8 +32,7 @@ DOMAIN = "recorder"
CONF_DB_INTEGRITY_CHECK = "db_integrity_check" CONF_DB_INTEGRITY_CHECK = "db_integrity_check"
MAX_QUEUE_BACKLOG_MIN_VALUE = 65000 MAX_QUEUE_BACKLOG_MIN_VALUE = 65000
ESTIMATED_QUEUE_ITEM_SIZE = 10240 MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG = 256 * 1024**2
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY = 0.65
# The maximum number of rows (events) we purge in one delete statement # The maximum number of rows (events) we purge in one delete statement

View File

@ -53,16 +53,15 @@ from . import migration, statistics
from .const import ( from .const import (
DB_WORKER_PREFIX, DB_WORKER_PREFIX,
DOMAIN, DOMAIN,
ESTIMATED_QUEUE_ITEM_SIZE,
KEEPALIVE_TIME, KEEPALIVE_TIME,
LAST_REPORTED_SCHEMA_VERSION, LAST_REPORTED_SCHEMA_VERSION,
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION, LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
MARIADB_PYMYSQL_URL_PREFIX, MARIADB_PYMYSQL_URL_PREFIX,
MARIADB_URL_PREFIX, MARIADB_URL_PREFIX,
MAX_QUEUE_BACKLOG_MIN_VALUE, MAX_QUEUE_BACKLOG_MIN_VALUE,
MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG,
MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX, MYSQLDB_URL_PREFIX,
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY,
SQLITE_MAX_BIND_VARS, SQLITE_MAX_BIND_VARS,
SQLITE_URL_PREFIX, SQLITE_URL_PREFIX,
STATISTICS_ROWS_SCHEMA_VERSION, STATISTICS_ROWS_SCHEMA_VERSION,
@ -156,6 +155,7 @@ ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask()
DB_LOCK_TIMEOUT = 30 DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds
QUEUE_CHECK_INTERVAL = timedelta(minutes=5)
INVALIDATED_ERR = "Database connection invalidated" INVALIDATED_ERR = "Database connection invalidated"
CONNECTIVITY_ERR = "Error in database connectivity during commit" CONNECTIVITY_ERR = "Error in database connectivity during commit"
@ -347,7 +347,7 @@ class Recorder(threading.Thread):
self._queue_watcher = async_track_time_interval( self._queue_watcher = async_track_time_interval(
self.hass, self.hass,
self._async_check_queue, self._async_check_queue,
timedelta(minutes=10), QUEUE_CHECK_INTERVAL,
name="Recorder queue watcher", name="Recorder queue watcher",
) )
@ -387,9 +387,8 @@ class Recorder(threading.Thread):
The queue grows during migration or if something really goes wrong. The queue grows during migration or if something really goes wrong.
""" """
size = self.backlog _LOGGER.debug("Recorder queue size is: %s", self.backlog)
_LOGGER.debug("Recorder queue size is: %s", size) if not self._reached_max_backlog():
if not self._reached_max_backlog_percentage(100):
return return
_LOGGER.error( _LOGGER.error(
( (
@ -408,22 +407,15 @@ class Recorder(threading.Thread):
self._psutil = ha_psutil.PsutilWrapper() self._psutil = ha_psutil.PsutilWrapper()
return cast(int, self._psutil.psutil.virtual_memory().available) return cast(int, self._psutil.psutil.virtual_memory().available)
def _reached_max_backlog_percentage(self, percentage: int) -> bool: def _reached_max_backlog(self) -> bool:
"""Check if the system has reached the max queue backlog and return the maximum if it has.""" """Check if the system has reached the max queue backlog and return True if it has."""
percentage_modifier = percentage / 100
current_backlog = self.backlog
# First check the minimum value since its cheap # First check the minimum value since its cheap
if current_backlog < (MAX_QUEUE_BACKLOG_MIN_VALUE * percentage_modifier): if self.backlog < MAX_QUEUE_BACKLOG_MIN_VALUE:
return False return False
# If they have more RAM available, keep filling the backlog # If they have more RAM available, keep filling the backlog
# since we do not want to stop recording events or give the # since we do not want to stop recording events or give the
# user a bad backup when they have plenty of RAM available. # user a bad backup when they have plenty of RAM available.
max_queue_backlog = int( return self._available_memory() < MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY
* (self._available_memory() / ESTIMATED_QUEUE_ITEM_SIZE)
)
self.max_backlog = max(max_queue_backlog, MAX_QUEUE_BACKLOG_MIN_VALUE)
return current_backlog >= (max_queue_backlog * percentage_modifier)
@callback @callback
def _async_stop_queue_watcher_and_event_listener(self) -> None: def _async_stop_queue_watcher_and_event_listener(self) -> None:
@ -1019,13 +1011,12 @@ class Recorder(threading.Thread):
# Notify that lock is being held, wait until database can be used again. # Notify that lock is being held, wait until database can be used again.
hass.add_job(_async_set_database_locked, task) hass.add_job(_async_set_database_locked, task)
while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
if self._reached_max_backlog_percentage(90): if self._reached_max_backlog():
_LOGGER.warning( _LOGGER.warning(
"Database queue backlog reached more than %s (%s events) of maximum queue " "Database queue backlog reached more than %s events "
"length while waiting for backup to finish; recorder will now " "while waiting for backup to finish; recorder will now "
"resume writing to database. The backup cannot be trusted and " "resume writing to database. The backup cannot be trusted and "
"must be restarted", "must be restarted",
"90%",
self.backlog, self.backlog,
) )
task.queue_overflow = True task.queue_overflow = True

View File

@ -6,6 +6,7 @@ import asyncio
from collections.abc import Generator from collections.abc import Generator
from datetime import datetime, timedelta from datetime import datetime, timedelta
import sqlite3 import sqlite3
import sys
import threading import threading
from typing import Any, cast from typing import Any, cast
from unittest.mock import MagicMock, Mock, patch from unittest.mock import MagicMock, Mock, patch
@ -1883,7 +1884,9 @@ async def test_database_lock_and_overflow(
with ( with (
patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1),
patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01), patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01),
patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), patch.object(
recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize
),
): ):
await async_setup_recorder_instance(hass, config) await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done() await hass.async_block_till_done()
@ -1943,26 +1946,43 @@ async def test_database_lock_and_overflow_checks_available_memory(
) )
) )
await async_setup_recorder_instance(hass, config) with patch(
await hass.async_block_till_done() "homeassistant.components.recorder.core.QUEUE_CHECK_INTERVAL",
timedelta(seconds=1),
):
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST" event_type = "EVENT_TEST"
event_types = (event_type,) event_types = (event_type,)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
min_available_memory = 256 * 1024**2
out_of_ram = False
def _get_available_memory(*args: Any, **kwargs: Any) -> int:
nonlocal out_of_ram
return min_available_memory / 2 if out_of_ram else min_available_memory
with ( with (
patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1),
patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 1), patch.object(
recorder.core,
"MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG",
min_available_memory,
),
patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01), patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01),
patch.object( patch.object(
recorder.core.Recorder, recorder.core.Recorder,
"_available_memory", "_available_memory",
return_value=recorder.core.ESTIMATED_QUEUE_ITEM_SIZE * 4, side_effect=_get_available_memory,
), ),
): ):
instance = get_instance(hass) instance = get_instance(hass)
await instance.lock_database() assert await instance.lock_database()
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 0
# Record up to the extended limit (which takes into account the available memory) # Record up to the extended limit (which takes into account the available memory)
for _ in range(2): for _ in range(2):
event_data = {"test_attr": 5, "test_attr_10": "nice"} event_data = {"test_attr": 5, "test_attr_10": "nice"}
@ -1979,6 +1999,7 @@ async def test_database_lock_and_overflow_checks_available_memory(
assert "Database queue backlog reached more than" not in caplog.text assert "Database queue backlog reached more than" not in caplog.text
out_of_ram = True
# Record beyond the extended limit (which takes into account the available memory) # Record beyond the extended limit (which takes into account the available memory)
for _ in range(20): for _ in range(20):
event_data = {"test_attr": 5, "test_attr_10": "nice"} event_data = {"test_attr": 5, "test_attr_10": "nice"}

View File

@ -292,7 +292,9 @@ async def test_events_during_migration_queue_exhausted(
new=create_engine_test, new=create_engine_test,
), ),
patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1),
patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), patch.object(
recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize
),
): ):
await async_setup_recorder_instance( await async_setup_recorder_instance(
hass, {"commit_interval": 0}, wait_recorder=False hass, {"commit_interval": 0}, wait_recorder=False

View File

@ -3,6 +3,7 @@
import datetime import datetime
from datetime import timedelta from datetime import timedelta
from statistics import fmean from statistics import fmean
import sys
from unittest.mock import ANY, patch from unittest.mock import ANY, patch
from freezegun import freeze_time from freezegun import freeze_time
@ -2515,7 +2516,9 @@ async def test_recorder_info_migration_queue_exhausted(
new=create_engine_test, new=create_engine_test,
), ),
patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1),
patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0), patch.object(
recorder.core, "MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG", sys.maxsize
),
): ):
async with async_test_recorder(hass, wait_recorder=False): async with async_test_recorder(hass, wait_recorder=False):
await hass.async_add_executor_job( await hass.async_add_executor_job(