From 6d137d23160f6e30ca3dcd63905637be61578ead Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 19 Apr 2021 05:22:38 -1000 Subject: [PATCH] Increase recorder test coverage (#49362) Co-authored-by: Martin Hjelmare --- homeassistant/components/recorder/__init__.py | 47 ++++--------- homeassistant/components/recorder/purge.py | 17 ++--- homeassistant/components/recorder/util.py | 40 +++++++++++ tests/components/recorder/test_purge.py | 65 ++++++++++++++++- tests/components/recorder/test_util.py | 69 ++++++++++++++++++- 5 files changed, 192 insertions(+), 46 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 733d8f248a8..db20c72c81e 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -45,8 +45,10 @@ from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PR from .models import Base, Events, RecorderRuns, States from .util import ( dburl_to_path, + end_incomplete_runs, move_away_broken_database, session_scope, + setup_connection_for_dialect, validate_or_move_away_sqlite_database, ) @@ -93,6 +95,9 @@ CONF_PURGE_INTERVAL = "purge_interval" CONF_EVENT_TYPES = "event_types" CONF_COMMIT_INTERVAL = "commit_interval" +INVALIDATED_ERR = "Database connection invalidated" +CONNECTIVITY_ERR = "Error in database connectivity during commit" + EXCLUDE_SCHEMA = INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER.extend( {vol.Optional(CONF_EVENT_TYPES): vol.All(cv.ensure_list, [cv.string])} ) @@ -667,13 +672,9 @@ class Recorder(threading.Thread): self._commit_event_session() return except (exc.InternalError, exc.OperationalError) as err: - if err.connection_invalidated: - message = "Database connection invalidated" - else: - message = "Error in database connectivity during commit" _LOGGER.error( "%s: Error executing query: %s. (retrying in %s seconds)", - message, + INVALIDATED_ERR if err.connection_invalidated else CONNECTIVITY_ERR, err, self.db_retry_wait, ) @@ -771,25 +772,9 @@ class Recorder(threading.Thread): """Dbapi specific connection settings.""" if self._completed_database_setup: return - - # We do not import sqlite3 here so mysql/other - # users do not have to pay for it to be loaded in - # memory - if self.db_url.startswith(SQLITE_URL_PREFIX): - old_isolation = dbapi_connection.isolation_level - dbapi_connection.isolation_level = None - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA journal_mode=WAL") - cursor.close() - dbapi_connection.isolation_level = old_isolation - # WAL mode only needs to be setup once - # instead of every time we open the sqlite connection - # as its persistent and isn't free to call every time. - self._completed_database_setup = True - elif self.db_url.startswith("mysql"): - cursor = dbapi_connection.cursor() - cursor.execute("SET session wait_timeout=28800") - cursor.close() + self._completed_database_setup = setup_connection_for_dialect( + self.engine.dialect.name, dbapi_connection + ) if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url: kwargs["connect_args"] = {"check_same_thread": False} @@ -825,17 +810,9 @@ class Recorder(threading.Thread): def _setup_run(self): """Log the start of the current run.""" with session_scope(session=self.get_session()) as session: - for run in session.query(RecorderRuns).filter_by(end=None): - run.closed_incorrect = True - run.end = self.recording_start - _LOGGER.warning( - "Ended unfinished session (id=%s from %s)", run.run_id, run.start - ) - session.add(run) - - self.run_info = RecorderRuns( - start=self.recording_start, created=dt_util.utcnow() - ) + start = self.recording_start + end_incomplete_runs(session, start) + self.run_info = RecorderRuns(start=start, created=dt_util.utcnow()) session.add(self.run_info) session.flush() session.expunge(self.run_info) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 424070156b0..22202ad1bbf 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -22,6 +22,12 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) +# Retry when one of the following MySQL errors occurred: +RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213) +# 1205: Lock wait timeout exceeded; try restarting transaction +# 1206: The total number of locks exceeds the lock table size +# 1213: Deadlock found when trying to get lock; try restarting transaction + def purge_old_data( instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False @@ -55,14 +61,9 @@ def purge_old_data( if repack: repack_database(instance) except OperationalError as err: - # Retry when one of the following MySQL errors occurred: - # 1205: Lock wait timeout exceeded; try restarting transaction - # 1206: The total number of locks exceeds the lock table size - # 1213: Deadlock found when trying to get lock; try restarting transaction - if instance.engine.driver in ("mysqldb", "pymysql") and err.orig.args[0] in ( - 1205, - 1206, - 1213, + if ( + instance.engine.dialect.name == "mysql" + and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS ): _LOGGER.info("%s; purge not completed, retrying", err.orig.args[1]) time.sleep(instance.db_retry_wait) diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index 89f74c44f4e..c18ff0a9830 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -19,6 +19,7 @@ from .models import ( ALL_TABLES, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, + RecorderRuns, process_timestamp, ) @@ -230,3 +231,42 @@ def move_away_broken_database(dbfile: str) -> None: if not os.path.exists(path): continue os.rename(path, f"{path}{corrupt_postfix}") + + +def execute_on_connection(dbapi_connection, statement): + """Execute a single statement with a dbapi connection.""" + cursor = dbapi_connection.cursor() + cursor.execute(statement) + cursor.close() + + +def setup_connection_for_dialect(dialect_name, dbapi_connection): + """Execute statements needed for dialect connection.""" + # Returns False if the the connection needs to be setup + # on the next connection, returns True if the connection + # never needs to be setup again. + if dialect_name == "sqlite": + old_isolation = dbapi_connection.isolation_level + dbapi_connection.isolation_level = None + execute_on_connection(dbapi_connection, "PRAGMA journal_mode=WAL") + dbapi_connection.isolation_level = old_isolation + # WAL mode only needs to be setup once + # instead of every time we open the sqlite connection + # as its persistent and isn't free to call every time. + return True + + if dialect_name == "mysql": + execute_on_connection(dbapi_connection, "SET session wait_timeout=28800") + + return False + + +def end_incomplete_runs(session, start_time): + """End any incomplete recorder runs.""" + for run in session.query(RecorderRuns).filter_by(end=None): + run.closed_incorrect = True + run.end = start_time + _LOGGER.warning( + "Ended unfinished session (id=%s from %s)", run.run_id, run.start + ) + session.add(run) diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index b97873df62e..d1825663ccc 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -2,9 +2,9 @@ from datetime import datetime, timedelta import json import sqlite3 -from unittest.mock import patch +from unittest.mock import MagicMock, patch -from sqlalchemy.exc import DatabaseError +from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.orm.session import Session from homeassistant.components import recorder @@ -88,6 +88,67 @@ async def test_purge_old_states_encouters_database_corruption( assert states_after_purge.count() == 0 +async def test_purge_old_states_encounters_temporary_mysql_error( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, + caplog, +): + """Test retry on specific mysql operational errors.""" + instance = await async_setup_recorder_instance(hass) + + await _add_test_states(hass, instance) + await async_wait_recording_done_without_instance(hass) + + mysql_exception = OperationalError("statement", {}, []) + mysql_exception.orig = MagicMock(args=(1205, "retryable")) + + with patch( + "homeassistant.components.recorder.purge.time.sleep" + ) as sleep_mock, patch( + "homeassistant.components.recorder.purge._purge_old_recorder_runs", + side_effect=[mysql_exception, None], + ), patch.object( + instance.engine.dialect, "name", "mysql" + ): + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0} + ) + await hass.async_block_till_done() + await async_wait_recording_done_without_instance(hass) + await async_wait_recording_done_without_instance(hass) + + assert "retrying" in caplog.text + assert sleep_mock.called + + +async def test_purge_old_states_encounters_operational_error( + hass: HomeAssistantType, + async_setup_recorder_instance: SetupRecorderInstanceT, + caplog, +): + """Test error on operational errors that are not mysql does not retry.""" + instance = await async_setup_recorder_instance(hass) + + await _add_test_states(hass, instance) + await async_wait_recording_done_without_instance(hass) + + exception = OperationalError("statement", {}, []) + + with patch( + "homeassistant.components.recorder.purge._purge_old_recorder_runs", + side_effect=exception, + ): + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0} + ) + await hass.async_block_till_done() + await async_wait_recording_done_without_instance(hass) + await async_wait_recording_done_without_instance(hass) + + assert "retrying" not in caplog.text + assert "Error purging history" in caplog.text + + async def test_purge_old_events( hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT ): diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index 4da635209b3..e4d942246c5 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -6,8 +6,10 @@ from unittest.mock import MagicMock, patch import pytest -from homeassistant.components.recorder import util +from homeassistant.components.recorder import run_information_with_session, util from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX +from homeassistant.components.recorder.models import RecorderRuns +from homeassistant.components.recorder.util import end_incomplete_runs, session_scope from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.util import dt as dt_util @@ -37,6 +39,16 @@ def hass_recorder(): hass.stop() +def test_session_scope_not_setup(hass_recorder): + """Try to create a session scope when not setup.""" + hass = hass_recorder() + with patch.object( + hass.data[DATA_INSTANCE], "get_session", return_value=None + ), pytest.raises(RuntimeError): + with util.session_scope(hass=hass): + pass + + def test_recorder_bad_commit(hass_recorder): """Bad _commit should retry 3 times.""" hass = hass_recorder() @@ -130,6 +142,36 @@ async def test_last_run_was_recently_clean(hass): ) +def test_setup_connection_for_dialect_mysql(): + """Test setting up the connection for a mysql dialect.""" + execute_mock = MagicMock() + close_mock = MagicMock() + + def _make_cursor_mock(*_): + return MagicMock(execute=execute_mock, close=close_mock) + + dbapi_connection = MagicMock(cursor=_make_cursor_mock) + + assert util.setup_connection_for_dialect("mysql", dbapi_connection) is False + + assert execute_mock.call_args[0][0] == "SET session wait_timeout=28800" + + +def test_setup_connection_for_dialect_sqlite(): + """Test setting up the connection for a sqlite dialect.""" + execute_mock = MagicMock() + close_mock = MagicMock() + + def _make_cursor_mock(*_): + return MagicMock(execute=execute_mock, close=close_mock) + + dbapi_connection = MagicMock(cursor=_make_cursor_mock) + + assert util.setup_connection_for_dialect("sqlite", dbapi_connection) is True + + assert execute_mock.call_args[0][0] == "PRAGMA journal_mode=WAL" + + def test_basic_sanity_check(hass_recorder): """Test the basic sanity checks with a missing table.""" hass = hass_recorder() @@ -194,3 +236,28 @@ def test_combined_checks(hass_recorder, caplog): caplog.clear() with pytest.raises(sqlite3.DatabaseError): util.run_checks_on_open_db("fake_db_path", cursor) + + +def test_end_incomplete_runs(hass_recorder, caplog): + """Ensure we can end incomplete runs.""" + hass = hass_recorder() + + with session_scope(hass=hass) as session: + run_info = run_information_with_session(session) + assert isinstance(run_info, RecorderRuns) + assert run_info.closed_incorrect is False + + now = dt_util.utcnow() + now_without_tz = now.replace(tzinfo=None) + end_incomplete_runs(session, now) + run_info = run_information_with_session(session) + assert run_info.closed_incorrect is True + assert run_info.end == now_without_tz + session.flush() + + later = dt_util.utcnow() + end_incomplete_runs(session, later) + run_info = run_information_with_session(session) + assert run_info.end == now_without_tz + + assert "Ended unfinished session" in caplog.text