Ensure recorder data integrity and MySQL lock error handling (#37228)

This commit is contained in:
Franck Nijhof 2020-06-30 01:23:11 +02:00 committed by GitHub
parent ac237ee10f
commit 79f131066c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 26 deletions

View File

@ -1,8 +1,9 @@
"""Purge old data helper.""" """Purge old data helper."""
from datetime import timedelta from datetime import timedelta
import logging import logging
import time
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import OperationalError, SQLAlchemyError
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -18,47 +19,46 @@ def purge_old_data(instance, purge_days: int, repack: bool) -> bool:
Cleans up an timeframe of an hour, based on the oldest record. Cleans up an timeframe of an hour, based on the oldest record.
""" """
purge_before = dt_util.utcnow() - timedelta(days=purge_days) purge_before = dt_util.utcnow() - timedelta(days=purge_days)
_LOGGER.debug("Purging events before %s", purge_before) _LOGGER.debug("Purging states and events before target %s", purge_before)
try: try:
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
# Purge a max of 1 hour, based on the oldest states or events record
batch_purge_before = purge_before
query = session.query(States).order_by(States.last_updated.asc()).limit(1) query = session.query(States).order_by(States.last_updated.asc()).limit(1)
states = execute(query, to_native=True, validate_entity_ids=False) states = execute(query, to_native=True, validate_entity_ids=False)
states_purge_before = purge_before
if states: if states:
states_purge_before = min( batch_purge_before = min(
purge_before, states[0].last_updated + timedelta(hours=1) batch_purge_before, states[0].last_updated + timedelta(hours=1),
) )
deleted_rows_states = (
session.query(States)
.filter(States.last_updated < states_purge_before)
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s states", deleted_rows_states)
query = session.query(Events).order_by(Events.time_fired.asc()).limit(1) query = session.query(Events).order_by(Events.time_fired.asc()).limit(1)
events = execute(query, to_native=True) events = execute(query, to_native=True)
events_purge_before = purge_before
if events: if events:
events_purge_before = min( batch_purge_before = min(
purge_before, events[0].time_fired + timedelta(hours=1) batch_purge_before, events[0].time_fired + timedelta(hours=1),
) )
deleted_rows_events = ( _LOGGER.debug("Purging states and events before %s", batch_purge_before)
session.query(Events)
.filter(Events.time_fired < events_purge_before) deleted_rows = (
session.query(States)
.filter(States.last_updated < batch_purge_before)
.delete(synchronize_session=False) .delete(synchronize_session=False)
) )
_LOGGER.debug("Deleted %s events", deleted_rows_events) _LOGGER.debug("Deleted %s states", deleted_rows)
deleted_rows = (
session.query(Events)
.filter(Events.time_fired < batch_purge_before)
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s events", deleted_rows)
# If states or events purging isn't processing the purge_before yet, # If states or events purging isn't processing the purge_before yet,
# return false, as we are not done yet. # return false, as we are not done yet.
if (states_purge_before and states_purge_before != purge_before) or ( if batch_purge_before != purge_before:
events_purge_before and events_purge_before != purge_before
):
_LOGGER.debug("Purging hasn't fully completed yet.") _LOGGER.debug("Purging hasn't fully completed yet.")
return False return False
@ -80,7 +80,21 @@ def purge_old_data(instance, purge_days: int, repack: bool) -> bool:
_LOGGER.debug("Optimizing SQL DB to free space") _LOGGER.debug("Optimizing SQL DB to free space")
instance.engine.execute("OPTIMIZE TABLE states, events, recorder_runs") instance.engine.execute("OPTIMIZE TABLE states, events, recorder_runs")
except OperationalError as err:
# Retry when one of the following MySQL errors occurred:
# 1205: Lock wait timeout exceeded; try restarting transaction
# 1206: The total number of locks exceeds the lock table size
# 1213: Deadlock found when trying to get lock; try restarting transaction
if instance.engine.driver in ("mysqldb", "pymysql") and err.orig.args[0] in (
1205,
1206,
1213,
):
_LOGGER.info("%s; purge not completed, retrying", err.orig.args[1])
time.sleep(instance.db_retry_wait)
return False
_LOGGER.warning("Error purging history: %s.", err)
except SQLAlchemyError as err: except SQLAlchemyError as err:
_LOGGER.warning("Error purging history: %s.", err) _LOGGER.warning("Error purging history: %s.", err)
return True return True

View File

@ -224,6 +224,6 @@ class TestRecorderPurge(unittest.TestCase):
self.hass.block_till_done() self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done() self.hass.data[DATA_INSTANCE].block_till_done()
assert ( assert (
mock_logger.debug.mock_calls[4][1][0] mock_logger.debug.mock_calls[5][1][0]
== "Vacuuming SQL DB to free space" == "Vacuuming SQL DB to free space"
) )