Reduce orm overhead by grouping object expiration (#41394)

This commit is contained in:
J. Nick Koston 2020-10-07 08:35:48 -05:00 committed by GitHub
parent 0dabf4962e
commit 113d738fa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 13 deletions

View File

@ -58,8 +58,13 @@ DEFAULT_DB_FILE = "home-assistant_v2.db"
DEFAULT_DB_INTEGRITY_CHECK = True DEFAULT_DB_INTEGRITY_CHECK = True
DEFAULT_DB_MAX_RETRIES = 10 DEFAULT_DB_MAX_RETRIES = 10
DEFAULT_DB_RETRY_WAIT = 3 DEFAULT_DB_RETRY_WAIT = 3
DEFAULT_COMMIT_INTERVAL = 1
KEEPALIVE_TIME = 30 KEEPALIVE_TIME = 30
# Controls how often we clean up
# States and Events objects
EXPIRE_AFTER_COMMITS = 120
CONF_AUTO_PURGE = "auto_purge" CONF_AUTO_PURGE = "auto_purge"
CONF_DB_URL = "db_url" CONF_DB_URL = "db_url"
CONF_DB_MAX_RETRIES = "db_max_retries" CONF_DB_MAX_RETRIES = "db_max_retries"
@ -91,9 +96,9 @@ CONFIG_SCHEMA = vol.Schema(
vol.Coerce(int), vol.Range(min=0) vol.Coerce(int), vol.Range(min=0)
), ),
vol.Optional(CONF_DB_URL): cv.string, vol.Optional(CONF_DB_URL): cv.string,
vol.Optional(CONF_COMMIT_INTERVAL, default=1): vol.All( vol.Optional(
vol.Coerce(int), vol.Range(min=0) CONF_COMMIT_INTERVAL, default=DEFAULT_COMMIT_INTERVAL
), ): vol.All(vol.Coerce(int), vol.Range(min=0)),
vol.Optional( vol.Optional(
CONF_DB_MAX_RETRIES, default=DEFAULT_DB_MAX_RETRIES CONF_DB_MAX_RETRIES, default=DEFAULT_DB_MAX_RETRIES
): cv.positive_int, ): cv.positive_int,
@ -238,6 +243,7 @@ class Recorder(threading.Thread):
self.exclude_t = exclude_t self.exclude_t = exclude_t
self._timechanges_seen = 0 self._timechanges_seen = 0
self._commits_without_expire = 0
self._keepalive_count = 0 self._keepalive_count = 0
self._old_states = {} self._old_states = {}
self._pending_expunge = [] self._pending_expunge = []
@ -345,6 +351,7 @@ class Recorder(threading.Thread):
) )
self.event_session = self.get_session() self.event_session = self.get_session()
self.event_session.expire_on_commit = False
# Use a session for the event read loop # Use a session for the event read loop
# with a commit every time the event time # with a commit every time the event time
# has changed. This reduces the disk io. # has changed. This reduces the disk io.
@ -485,24 +492,35 @@ class Recorder(threading.Thread):
try: try:
self.event_session = self.get_session() self.event_session = self.get_session()
self.event_session.expire_on_commit = False
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
# Must catch the exception to prevent the loop from collapsing # Must catch the exception to prevent the loop from collapsing
_LOGGER.exception("Error while creating new event session: %s", err) _LOGGER.exception("Error while creating new event session: %s", err)
def _commit_event_session(self): def _commit_event_session(self):
self._commits_without_expire += 1
try: try:
self.event_session.flush() if self._pending_expunge:
for dbstate in self._pending_expunge: self.event_session.flush()
# Expunge the state so its not expired for dbstate in self._pending_expunge:
# until we use it later for dbstate.old_state # Expunge the state so its not expired
self.event_session.expunge(dbstate) # until we use it later for dbstate.old_state
self._pending_expunge = [] self.event_session.expunge(dbstate)
self._pending_expunge = []
self.event_session.commit() self.event_session.commit()
except Exception as err: except Exception as err:
_LOGGER.error("Error executing query: %s", err) _LOGGER.error("Error executing query: %s", err)
self.event_session.rollback() self.event_session.rollback()
raise raise
# Expire is an expensive operation (frequently more expensive
# than the flush and commit itself) so we only
# do it after EXPIRE_AFTER_COMMITS commits
if self._commits_without_expire == EXPIRE_AFTER_COMMITS:
self._commits_without_expire = 0
self.event_session.expire_all()
@callback @callback
def event_listener(self, event): def event_listener(self, event):
"""Listen for new events and put them in the process queue.""" """Listen for new events and put them in the process queue."""

View File

@ -1,12 +1,12 @@
"""Common test utils for working with recorder.""" """Common test utils for working with recorder."""
from datetime import timedelta
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from tests.common import fire_time_changed from tests.common import fire_time_changed
DB_COMMIT_INTERVAL = 50
def wait_recording_done(hass): def wait_recording_done(hass):
"""Block till recording is done.""" """Block till recording is done."""
@ -18,6 +18,6 @@ def wait_recording_done(hass):
def trigger_db_commit(hass): def trigger_db_commit(hass):
"""Force the recorder to commit.""" """Force the recorder to commit."""
for _ in range(DB_COMMIT_INTERVAL): for _ in range(recorder.DEFAULT_COMMIT_INTERVAL):
# We only commit on time change # We only commit on time change
fire_time_changed(hass, dt_util.utcnow()) fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))