Reduce memory pressure during database migration (#69628)

This commit is contained in:
J. Nick Koston 2022-04-07 18:29:31 -10:00 committed by GitHub
parent 8c00fde27d
commit 66f0a3816a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 37 deletions

View File

@ -579,6 +579,34 @@ class EventTask(RecorderTask):
instance._process_one_event(self.event)
@dataclass
class KeepAliveTask(RecorderTask):
"""A keep alive to be sent."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._send_keep_alive()
@dataclass
class CommitTask(RecorderTask):
"""Commit the event session."""
commit_before = False
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._commit_event_session_or_retry()
COMMIT_TASK = CommitTask()
KEEP_ALIVE_TASK = KeepAliveTask()
class Recorder(threading.Thread):
"""A threaded recorder class."""
@ -621,9 +649,7 @@ class Recorder(threading.Thread):
self.entity_filter = entity_filter
self.exclude_t = exclude_t
self._timechanges_seen = 0
self._commits_without_expire = 0
self._keepalive_count = 0
self._old_states: dict[str, States] = {}
self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE)
self._pending_state_attributes: dict[str, StateAttributes] = {}
@ -640,6 +666,10 @@ class Recorder(threading.Thread):
self._db_executor: DBInterruptibleThreadPoolExecutor | None = None
self._exclude_attributes_by_domain = exclude_attributes_by_domain
self._keep_alive_listener: CALLBACK_TYPE | None = None
self._commit_listener: CALLBACK_TYPE | None = None
self._periodic_listener: CALLBACK_TYPE | None = None
self._nightly_listener: CALLBACK_TYPE | None = None
self.enabled = True
def set_enable(self, enable: bool) -> None:
@ -670,6 +700,22 @@ class Recorder(threading.Thread):
self.hass, self._async_check_queue, timedelta(minutes=10)
)
@callback
def _async_keep_alive(self, now: datetime) -> None:
"""Queue a keep alive."""
if self._event_listener:
self.queue.put(KEEP_ALIVE_TASK)
@callback
def _async_commit(self, now: datetime) -> None:
"""Queue a commit."""
if (
self._event_listener
and not self._database_lock_task
and self._event_session_has_pending_writes()
):
self.queue.put(COMMIT_TASK)
@callback
def async_add_executor_job(
self, target: Callable[..., T], *args: Any
@ -709,9 +755,29 @@ class Recorder(threading.Thread):
self._event_listener()
self._event_listener = None
@callback
def _async_stop_listeners(self) -> None:
"""Stop listeners."""
self._async_stop_queue_watcher_and_event_listener()
if self._keep_alive_listener:
self._keep_alive_listener()
self._keep_alive_listener = None
if self._commit_listener:
self._commit_listener()
self._commit_listener = None
if self._nightly_listener:
self._nightly_listener()
self._nightly_listener = None
if self._periodic_listener:
self._periodic_listener()
self._periodic_listener = None
@callback
def _async_event_filter(self, event: Event) -> bool:
"""Filter events."""
if event.event_type == EVENT_TIME_CHANGED:
return False
if event.event_type in self.exclude_t:
return False
@ -780,7 +846,7 @@ class Recorder(threading.Thread):
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue.put(StopTask())
self._async_stop_queue_watcher_and_event_listener()
self._async_stop_listeners()
await self.hass.async_add_executor_job(self.join)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown)
@ -807,7 +873,7 @@ class Recorder(threading.Thread):
"The recorder could not start, check [the logs](/config/logs)",
"Recorder",
)
self._async_stop_queue_watcher_and_event_listener()
self._async_stop_listeners()
@callback
def async_connection_success(self) -> None:
@ -864,6 +930,10 @@ class Recorder(threading.Thread):
"""Schedule external statistics."""
self.queue.put(ExternalStatisticsTask(metadata, stats))
@callback
def _using_sqlite(self) -> bool:
return bool(self.engine and self.engine.dialect.name == "sqlite")
@callback
def _async_setup_periodic_tasks(self) -> None:
"""Prepare periodic tasks."""
@ -871,13 +941,26 @@ class Recorder(threading.Thread):
# Home Assistant is shutting down
return
# If the db is using a socket connection, we need to keep alive
# to prevent errors from unexpected disconnects
if not self._using_sqlite():
self._keep_alive_listener = async_track_time_interval(
self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME)
)
# If the commit interval is not 0, we need commit periodicly
if self.commit_interval:
self._commit_listener = async_track_time_interval(
self.hass, self._async_commit, timedelta(seconds=self.commit_interval)
)
# Run nightly tasks at 4:12am
async_track_time_change(
self._nightly_listener = async_track_time_change(
self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0
)
# Compile short term statistics every 5 minutes
async_track_utc_time_change(
self._periodic_listener = async_track_utc_time_change(
self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10
)
@ -947,6 +1030,7 @@ class Recorder(threading.Thread):
self.stop_requested = False
while not self.stop_requested:
task = self.queue.get()
_LOGGER.debug("Processing task: %s", task)
try:
self._process_one_task_or_recover(task)
except Exception as err: # pylint: disable=broad-except
@ -1050,20 +1134,14 @@ class Recorder(threading.Thread):
)
def _process_one_event(self, event: Event) -> None:
if event.event_type == EVENT_TIME_CHANGED:
self._keepalive_count += 1
if self._keepalive_count >= KEEPALIVE_TIME:
self._keepalive_count = 0
self._send_keep_alive()
if self.commit_interval:
self._timechanges_seen += 1
if self._timechanges_seen >= self.commit_interval:
self._timechanges_seen = 0
self._commit_event_session_or_retry()
return
if not self.enabled:
return
self._process_event_into_session(event)
# Commit if the commit interval is zero
if not self.commit_interval:
self._commit_event_session_or_retry()
def _process_event_into_session(self, event: Event) -> None:
assert self.event_session is not None
try:
@ -1129,12 +1207,8 @@ class Recorder(threading.Thread):
self._pending_expunge.append(dbstate)
else:
dbstate.state = None
self.event_session.add(dbstate)
dbstate.event = dbevent
# Commit right away if the commit interval is zero
if not self.commit_interval:
self._commit_event_session_or_retry()
self.event_session.add(dbstate)
def _handle_database_error(self, err: Exception) -> bool:
"""Handle a database error that may result in moving away the corrupt db."""
@ -1146,11 +1220,14 @@ class Recorder(threading.Thread):
return True
return False
def _event_session_has_pending_writes(self) -> bool:
return bool(
self.event_session and (self.event_session.new or self.event_session.dirty)
)
def _commit_event_session_or_retry(self) -> None:
"""Commit the event session if there is work to do."""
if not self.event_session or (
not self.event_session.new and not self.event_session.dirty
):
if not self._event_session_has_pending_writes():
return
tries = 1
while tries <= self.db_max_retries:
@ -1197,7 +1274,7 @@ class Recorder(threading.Thread):
# Expire is an expensive operation (frequently more expensive
# than the flush and commit itself) so we only
# do it after EXPIRE_AFTER_COMMITS commits
if self._commits_without_expire == EXPIRE_AFTER_COMMITS:
if self._commits_without_expire >= EXPIRE_AFTER_COMMITS:
self._commits_without_expire = 0
self.event_session.expire_all()
@ -1266,7 +1343,7 @@ class Recorder(threading.Thread):
async def lock_database(self) -> bool:
"""Lock database so it can be backed up safely."""
if not self.engine or self.engine.dialect.name != "sqlite":
if not self._using_sqlite():
_LOGGER.debug(
"Not a SQLite database or not connected, locking not necessary"
)
@ -1295,7 +1372,7 @@ class Recorder(threading.Thread):
Returns true if database lock has been held throughout the process.
"""
if not self.engine or self.engine.dialect.name != "sqlite":
if not self._using_sqlite():
_LOGGER.debug(
"Not a SQLite database or not connected, unlocking not necessary"
)
@ -1414,7 +1491,7 @@ class Recorder(threading.Thread):
def _shutdown(self) -> None:
"""Save end time for current run."""
self.hass.add_job(self._async_stop_queue_watcher_and_event_listener)
self.hass.add_job(self._async_stop_listeners)
self._stop_executor()
self._end_session()
self._close_connection()

View File

@ -13,6 +13,7 @@ from homeassistant.components import recorder
from homeassistant.components.recorder import (
CONF_AUTO_PURGE,
CONF_AUTO_REPACK,
CONF_COMMIT_INTERVAL,
CONF_DB_URL,
CONFIG_SCHEMA,
DOMAIN,
@ -186,7 +187,7 @@ async def test_saving_many_states(
):
"""Test we expire after many commits."""
instance = await async_setup_recorder_instance(
hass, {recorder.CONF_COMMIT_INTERVAL: 1}
hass, {recorder.CONF_COMMIT_INTERVAL: 0}
)
entity_id = "test.recorder"
@ -825,7 +826,7 @@ def test_auto_purge_disabled(hass_recorder):
dt_util.set_default_time_zone(tz)
# Purging is scheduled to happen at 4:12am every day. We want
# to verify that when auto purge is disabled perodic db cleanups
# to verify that when auto purge is disabled periodic db cleanups
# are still scheduled
#
# The clock is started at 4:15am then advanced forward below
@ -1208,7 +1209,9 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog):
test_db_file = await hass.async_add_executor_job(_create_tmpdir_for_test_db)
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}})
assert await async_setup_component(
hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl, CONF_COMMIT_INTERVAL: 0}}
)
await hass.async_block_till_done()
caplog.clear()
@ -1299,7 +1302,10 @@ def test_entity_id_filter(hass_recorder):
async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path):
"""Test writing events during lock getting written after unlocking."""
# Use file DB, in memory DB cannot do write locks.
config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")}
config = {
recorder.CONF_COMMIT_INTERVAL: 0,
recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db"),
}
await async_init_recorder_component(hass, config)
await hass.async_block_till_done()
@ -1311,7 +1317,7 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path):
event_type = "EVENT_TEST"
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.bus.fire(event_type, event_data)
hass.bus.async_fire(event_type, event_data)
task = asyncio.create_task(async_wait_recording_done(hass, instance))
# Recording can't be finished while lock is held
@ -1333,7 +1339,10 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path):
async def test_database_lock_and_overflow(hass: HomeAssistant, tmp_path):
"""Test writing events during lock leading to overflow the queue causes the database to unlock."""
# Use file DB, in memory DB cannot do write locks.
config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")}
config = {
recorder.CONF_COMMIT_INTERVAL: 0,
recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db"),
}
await async_init_recorder_component(hass, config)
await hass.async_block_till_done()
@ -1402,3 +1411,42 @@ async def test_in_memory_database(hass, caplog):
hass, recorder.DOMAIN, {recorder.DOMAIN: {recorder.CONF_DB_URL: "sqlite://"}}
)
assert "In-memory SQLite database is not supported" in caplog.text
async def test_database_connection_keep_alive(
hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
caplog: pytest.LogCaptureFixture,
):
"""Test we keep alive socket based dialects."""
with patch(
"homeassistant.components.recorder.Recorder._using_sqlite", return_value=False
):
instance = await async_setup_recorder_instance(hass)
# We have to mock this since we don't have a mock
# MySQL server available in tests.
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await instance.async_recorder_ready.wait()
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME)
)
await async_wait_recording_done(hass, instance)
assert "Sending keepalive" in caplog.text
async def test_database_connection_keep_alive_disabled_on_sqlite(
hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
caplog: pytest.LogCaptureFixture,
):
"""Test we do not do keep alive for sqlite."""
instance = await async_setup_recorder_instance(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
await instance.async_recorder_ready.wait()
async_fire_time_changed(
hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME)
)
await async_wait_recording_done(hass, instance)
assert "Sending keepalive" not in caplog.text

View File

@ -206,7 +206,9 @@ async def test_events_during_migration_queue_exhausted(hass):
"homeassistant.components.recorder.create_engine", new=create_engine_test
), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1):
await async_setup_component(
hass, "recorder", {"recorder": {"db_url": "sqlite://"}}
hass,
"recorder",
{"recorder": {"db_url": "sqlite://", "commit_interval": 0}},
)
hass.states.async_set("my.entity", "on", {})
await hass.async_block_till_done()