Dynamically size recorder max backlog based on available memory (#90894)

Co-authored-by: Franck Nijhof <git@frenck.dev>
This commit is contained in:
J. Nick Koston 2023-04-18 00:35:49 -10:00 committed by GitHub
parent f49dc65ff2
commit 2ec1359063
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 151 additions and 29 deletions

View File

@ -19,7 +19,9 @@ EVENT_RECORDER_HOURLY_STATISTICS_GENERATED = "recorder_hourly_statistics_generat
CONF_DB_INTEGRITY_CHECK = "db_integrity_check" CONF_DB_INTEGRITY_CHECK = "db_integrity_check"
MAX_QUEUE_BACKLOG = 65000 MAX_QUEUE_BACKLOG_MIN_VALUE = 65000
ESTIMATED_QUEUE_ITEM_SIZE = 10240
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY = 0.65
# The maximum number of rows (events) we purge in one delete statement # The maximum number of rows (events) we purge in one delete statement

View File

@ -11,9 +11,10 @@ import queue
import sqlite3 import sqlite3
import threading import threading
import time import time
from typing import Any, TypeVar from typing import Any, TypeVar, cast
import async_timeout import async_timeout
import psutil_home_assistant as ha_psutil
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
from sqlalchemy.engine.interfaces import DBAPIConnection from sqlalchemy.engine.interfaces import DBAPIConnection
@ -45,14 +46,16 @@ from .const import (
CONTEXT_ID_AS_BINARY_SCHEMA_VERSION, CONTEXT_ID_AS_BINARY_SCHEMA_VERSION,
DB_WORKER_PREFIX, DB_WORKER_PREFIX,
DOMAIN, DOMAIN,
ESTIMATED_QUEUE_ITEM_SIZE,
EVENT_TYPE_IDS_SCHEMA_VERSION, EVENT_TYPE_IDS_SCHEMA_VERSION,
KEEPALIVE_TIME, KEEPALIVE_TIME,
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION, LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
MARIADB_PYMYSQL_URL_PREFIX, MARIADB_PYMYSQL_URL_PREFIX,
MARIADB_URL_PREFIX, MARIADB_URL_PREFIX,
MAX_QUEUE_BACKLOG, MAX_QUEUE_BACKLOG_MIN_VALUE,
MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX, MYSQLDB_URL_PREFIX,
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY,
SQLITE_URL_PREFIX, SQLITE_URL_PREFIX,
STATES_META_SCHEMA_VERSION, STATES_META_SCHEMA_VERSION,
STATISTICS_ROWS_SCHEMA_VERSION, STATISTICS_ROWS_SCHEMA_VERSION,
@ -148,7 +151,7 @@ WAIT_TASK = WaitTask()
ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask() ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask()
DB_LOCK_TIMEOUT = 30 DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 1 DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds
INVALIDATED_ERR = "Database connection invalidated" INVALIDATED_ERR = "Database connection invalidated"
@ -201,6 +204,8 @@ class Recorder(threading.Thread):
self.async_recorder_ready = asyncio.Event() self.async_recorder_ready = asyncio.Event()
self._queue_watch = threading.Event() self._queue_watch = threading.Event()
self.engine: Engine | None = None self.engine: Engine | None = None
self.max_backlog: int = MAX_QUEUE_BACKLOG_MIN_VALUE
self._psutil: ha_psutil.PsutilWrapper | None = None
# The entity_filter is exposed on the recorder instance so that # The entity_filter is exposed on the recorder instance so that
# it can be used to see if an entity is being recorded and is called # it can be used to see if an entity is being recorded and is called
@ -343,7 +348,7 @@ class Recorder(threading.Thread):
""" """
size = self.backlog size = self.backlog
_LOGGER.debug("Recorder queue size is: %s", size) _LOGGER.debug("Recorder queue size is: %s", size)
if size <= MAX_QUEUE_BACKLOG: if not self._reached_max_backlog_percentage(100):
return return
_LOGGER.error( _LOGGER.error(
( (
@ -352,10 +357,33 @@ class Recorder(threading.Thread):
"is corrupt due to a disk problem; The recorder will stop " "is corrupt due to a disk problem; The recorder will stop "
"recording events to avoid running out of memory" "recording events to avoid running out of memory"
), ),
MAX_QUEUE_BACKLOG, self.backlog,
) )
self._async_stop_queue_watcher_and_event_listener() self._async_stop_queue_watcher_and_event_listener()
def _available_memory(self) -> int:
"""Return the available memory in bytes."""
if not self._psutil:
self._psutil = ha_psutil.PsutilWrapper()
return cast(int, self._psutil.psutil.virtual_memory().available)
def _reached_max_backlog_percentage(self, percentage: int) -> bool:
"""Check if the system has reached the max queue backlog and return the maximum if it has."""
percentage_modifier = percentage / 100
current_backlog = self.backlog
# First check the minimum value since its cheap
if current_backlog < (MAX_QUEUE_BACKLOG_MIN_VALUE * percentage_modifier):
return False
# If they have more RAM available, keep filling the backlog
# since we do not want to stop recording events or give the
# user a bad backup when they have plenty of RAM available.
max_queue_backlog = int(
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY
* (self._available_memory() / ESTIMATED_QUEUE_ITEM_SIZE)
)
self.max_backlog = max(max_queue_backlog, MAX_QUEUE_BACKLOG_MIN_VALUE)
return current_backlog >= (max_queue_backlog * percentage_modifier)
@callback @callback
def _async_stop_queue_watcher_and_event_listener(self) -> None: def _async_stop_queue_watcher_and_event_listener(self) -> None:
"""Stop watching the queue and listening for events.""" """Stop watching the queue and listening for events."""
@ -705,8 +733,8 @@ class Recorder(threading.Thread):
self.schema_version = SCHEMA_VERSION self.schema_version = SCHEMA_VERSION
if not self._event_listener: if not self._event_listener:
# If the schema migration takes so long that the end # If the schema migration takes so long that the end
# queue watcher safety kicks in because MAX_QUEUE_BACKLOG # queue watcher safety kicks in because _reached_max_backlog
# is reached, we need to reinitialize the listener. # was True, we need to reinitialize the listener.
self.hass.add_job(self.async_initialize) self.hass.add_job(self.async_initialize)
else: else:
persistent_notification.create( persistent_notification.create(
@ -946,12 +974,14 @@ class Recorder(threading.Thread):
# Notify that lock is being held, wait until database can be used again. # Notify that lock is being held, wait until database can be used again.
self.hass.add_job(_async_set_database_locked, task) self.hass.add_job(_async_set_database_locked, task)
while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT): while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
if self.backlog > MAX_QUEUE_BACKLOG * 0.9: if self._reached_max_backlog_percentage(90):
_LOGGER.warning( _LOGGER.warning(
"Database queue backlog reached more than 90% of maximum queue " "Database queue backlog reached more than %s (%s events) of maximum queue "
"length while waiting for backup to finish; recorder will now " "length while waiting for backup to finish; recorder will now "
"resume writing to database. The backup cannot be trusted and " "resume writing to database. The backup cannot be trusted and "
"must be restarted" "must be restarted",
"90%",
self.backlog,
) )
task.queue_overflow = True task.queue_overflow = True
break break

View File

@ -6,5 +6,9 @@
"integration_type": "system", "integration_type": "system",
"iot_class": "local_push", "iot_class": "local_push",
"quality_scale": "internal", "quality_scale": "internal",
"requirements": ["sqlalchemy==2.0.9", "fnv-hash-fast==0.3.1"] "requirements": [
"sqlalchemy==2.0.9",
"fnv-hash-fast==0.3.1",
"psutil-home-assistant==0.0.1"
]
} }

View File

@ -30,7 +30,6 @@ from homeassistant.util.unit_conversion import (
VolumeConverter, VolumeConverter,
) )
from .const import MAX_QUEUE_BACKLOG
from .models import StatisticPeriod from .models import StatisticPeriod
from .statistics import ( from .statistics import (
STATISTIC_UNIT_TO_UNIT_CONVERTER, STATISTIC_UNIT_TO_UNIT_CONVERTER,
@ -504,7 +503,7 @@ def ws_info(
recorder_info = { recorder_info = {
"backlog": backlog, "backlog": backlog,
"max_backlog": MAX_QUEUE_BACKLOG, "max_backlog": instance.max_backlog,
"migration_in_progress": migration_in_progress, "migration_in_progress": migration_in_progress,
"migration_is_live": migration_is_live, "migration_is_live": migration_is_live,
"recording": recording, "recording": recording,

View File

@ -1401,6 +1401,7 @@ prometheus_client==0.7.1
proxmoxer==2.0.1 proxmoxer==2.0.1
# homeassistant.components.hardware # homeassistant.components.hardware
# homeassistant.components.recorder
psutil-home-assistant==0.0.1 psutil-home-assistant==0.0.1
# homeassistant.components.systemmonitor # homeassistant.components.systemmonitor

View File

@ -1034,6 +1034,7 @@ progettihwsw==0.1.1
prometheus_client==0.7.1 prometheus_client==0.7.1
# homeassistant.components.hardware # homeassistant.components.hardware
# homeassistant.components.recorder
psutil-home-assistant==0.0.1 psutil-home-assistant==0.0.1
# homeassistant.components.androidtv # homeassistant.components.androidtv

View File

@ -450,7 +450,7 @@ async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions(
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
with patch.object(instance, "db_retry_wait", 0.05), patch.object( with patch.object(instance, "db_retry_wait", 0.01), patch.object(
instance.event_session, instance.event_session,
"flush", "flush",
side_effect=OperationalError( side_effect=OperationalError(
@ -1757,7 +1757,7 @@ async def test_database_lock_and_unlock(
# Recording can't be finished while lock is held # Recording can't be finished while lock is held
with pytest.raises(asyncio.TimeoutError): with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(asyncio.shield(task), timeout=1) await asyncio.wait_for(asyncio.shield(task), timeout=0.25)
db_events = await hass.async_add_executor_job(_get_db_events) db_events = await hass.async_add_executor_job(_get_db_events)
assert len(db_events) == 0 assert len(db_events) == 0
@ -1773,11 +1773,12 @@ async def test_database_lock_and_overflow(
hass: HomeAssistant, hass: HomeAssistant,
recorder_db_url: str, recorder_db_url: str,
tmp_path: Path, tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None: ) -> None:
"""Test writing events during lock leading to overflow the queue causes the database to unlock.""" """Test writing events during lock leading to overflow the queue causes the database to unlock."""
if recorder_db_url.startswith(("mysql://", "postgresql://")): if recorder_db_url.startswith(("mysql://", "postgresql://")):
# Database locking is only used for SQLite # Database locking is only used for SQLite
return return pytest.skip("Database locking is only used for SQLite")
# Use file DB, in memory DB cannot do write locks. # Use file DB, in memory DB cannot do write locks.
if recorder_db_url == "sqlite://": if recorder_db_url == "sqlite://":
@ -1787,10 +1788,6 @@ async def test_database_lock_and_overflow(
recorder.CONF_COMMIT_INTERVAL: 0, recorder.CONF_COMMIT_INTERVAL: 0,
recorder.CONF_DB_URL: recorder_db_url, recorder.CONF_DB_URL: recorder_db_url,
} }
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST"
event_types = (event_type,)
def _get_db_events(): def _get_db_events():
with session_scope(hass=hass, read_only=True) as session: with session_scope(hass=hass, read_only=True) as session:
@ -1800,26 +1797,110 @@ async def test_database_lock_and_overflow(
) )
) )
instance = get_instance(hass) with patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(
recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01
), patch.object(recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0):
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST"
event_types = (event_type,)
instance = get_instance(hass)
with patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1), patch.object(
recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.1
):
await instance.lock_database() await instance.lock_database()
event_data = {"test_attr": 5, "test_attr_10": "nice"} event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.fire(event_type, event_data) hass.bus.async_fire(event_type, event_data)
# Check that this causes the queue to overflow and write succeeds # Check that this causes the queue to overflow and write succeeds
# even before unlocking. # even before unlocking.
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
db_events = await hass.async_add_executor_job(_get_db_events) db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 1 assert len(db_events) == 1
assert "Database queue backlog reached more than" in caplog.text
assert not instance.unlock_database() assert not instance.unlock_database()
async def test_database_lock_and_overflow_checks_available_memory(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
tmp_path: Path,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test writing events during lock leading to overflow the queue causes the database to unlock."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
return pytest.skip("Database locking is only used for SQLite")
# Use file DB, in memory DB cannot do write locks.
if recorder_db_url == "sqlite://":
# Use file DB, in memory DB cannot do write locks.
recorder_db_url = "sqlite:///" + str(tmp_path / "pytest.db")
config = {
recorder.CONF_COMMIT_INTERVAL: 0,
recorder.CONF_DB_URL: recorder_db_url,
}
def _get_db_events():
with session_scope(hass=hass, read_only=True) as session:
return list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
)
)
await async_setup_recorder_instance(hass, config)
await hass.async_block_till_done()
event_type = "EVENT_TEST"
event_types = (event_type,)
await async_wait_recording_done(hass)
with patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(
recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 1
), patch.object(recorder.core, "DB_LOCK_QUEUE_CHECK_TIMEOUT", 0.01), patch.object(
recorder.core.Recorder,
"_available_memory",
return_value=recorder.core.ESTIMATED_QUEUE_ITEM_SIZE * 4,
):
instance = get_instance(hass)
await instance.lock_database()
# Record up to the extended limit (which takes into account the available memory)
for _ in range(2):
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data)
def _wait_database_unlocked():
return instance._database_lock_task.database_unlock.wait(0.2)
databack_unlocked = await hass.async_add_executor_job(_wait_database_unlocked)
assert not databack_unlocked
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) == 0
assert "Database queue backlog reached more than" not in caplog.text
# Record beyond the extended limit (which takes into account the available memory)
for _ in range(20):
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.async_fire(event_type, event_data)
# Check that this causes the queue to overflow and write succeeds
# even before unlocking.
await async_wait_recording_done(hass)
assert not instance.unlock_database()
assert "Database queue backlog reached more than" in caplog.text
db_events = await instance.async_add_executor_job(_get_db_events)
assert len(db_events) >= 2
async def test_database_lock_timeout( async def test_database_lock_timeout(
recorder_mock: Recorder, hass: HomeAssistant, recorder_db_url: str recorder_mock: Recorder, hass: HomeAssistant, recorder_db_url: str
) -> None: ) -> None:

View File

@ -259,7 +259,9 @@ async def test_events_during_migration_queue_exhausted(
with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch(
"homeassistant.components.recorder.core.create_engine", "homeassistant.components.recorder.core.create_engine",
new=create_engine_test, new=create_engine_test,
), patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1): ), patch.object(recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1), patch.object(
recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0
):
recorder_helper.async_initialize_recorder(hass) recorder_helper.async_initialize_recorder(hass)
await async_setup_component( await async_setup_component(
hass, hass,

View File

@ -2203,7 +2203,9 @@ async def test_recorder_info_migration_queue_exhausted(
"homeassistant.components.recorder.core.create_engine", "homeassistant.components.recorder.core.create_engine",
new=create_engine_test, new=create_engine_test,
), patch.object( ), patch.object(
recorder.core, "MAX_QUEUE_BACKLOG", 1 recorder.core, "MAX_QUEUE_BACKLOG_MIN_VALUE", 1
), patch.object(
recorder.core, "QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY", 0
), patch( ), patch(
"homeassistant.components.recorder.migration._apply_update", "homeassistant.components.recorder.migration._apply_update",
wraps=stalled_migration, wraps=stalled_migration,