mirror of
https://github.com/home-assistant/core.git
synced 2025-07-22 04:37:06 +00:00
Increase recorder test coverage (#49362)
Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
parent
b69b55987d
commit
6d137d2316
@ -45,8 +45,10 @@ from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, DOMAIN, SQLITE_URL_PR
|
|||||||
from .models import Base, Events, RecorderRuns, States
|
from .models import Base, Events, RecorderRuns, States
|
||||||
from .util import (
|
from .util import (
|
||||||
dburl_to_path,
|
dburl_to_path,
|
||||||
|
end_incomplete_runs,
|
||||||
move_away_broken_database,
|
move_away_broken_database,
|
||||||
session_scope,
|
session_scope,
|
||||||
|
setup_connection_for_dialect,
|
||||||
validate_or_move_away_sqlite_database,
|
validate_or_move_away_sqlite_database,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -93,6 +95,9 @@ CONF_PURGE_INTERVAL = "purge_interval"
|
|||||||
CONF_EVENT_TYPES = "event_types"
|
CONF_EVENT_TYPES = "event_types"
|
||||||
CONF_COMMIT_INTERVAL = "commit_interval"
|
CONF_COMMIT_INTERVAL = "commit_interval"
|
||||||
|
|
||||||
|
INVALIDATED_ERR = "Database connection invalidated"
|
||||||
|
CONNECTIVITY_ERR = "Error in database connectivity during commit"
|
||||||
|
|
||||||
EXCLUDE_SCHEMA = INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER.extend(
|
EXCLUDE_SCHEMA = INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER.extend(
|
||||||
{vol.Optional(CONF_EVENT_TYPES): vol.All(cv.ensure_list, [cv.string])}
|
{vol.Optional(CONF_EVENT_TYPES): vol.All(cv.ensure_list, [cv.string])}
|
||||||
)
|
)
|
||||||
@ -667,13 +672,9 @@ class Recorder(threading.Thread):
|
|||||||
self._commit_event_session()
|
self._commit_event_session()
|
||||||
return
|
return
|
||||||
except (exc.InternalError, exc.OperationalError) as err:
|
except (exc.InternalError, exc.OperationalError) as err:
|
||||||
if err.connection_invalidated:
|
|
||||||
message = "Database connection invalidated"
|
|
||||||
else:
|
|
||||||
message = "Error in database connectivity during commit"
|
|
||||||
_LOGGER.error(
|
_LOGGER.error(
|
||||||
"%s: Error executing query: %s. (retrying in %s seconds)",
|
"%s: Error executing query: %s. (retrying in %s seconds)",
|
||||||
message,
|
INVALIDATED_ERR if err.connection_invalidated else CONNECTIVITY_ERR,
|
||||||
err,
|
err,
|
||||||
self.db_retry_wait,
|
self.db_retry_wait,
|
||||||
)
|
)
|
||||||
@ -771,25 +772,9 @@ class Recorder(threading.Thread):
|
|||||||
"""Dbapi specific connection settings."""
|
"""Dbapi specific connection settings."""
|
||||||
if self._completed_database_setup:
|
if self._completed_database_setup:
|
||||||
return
|
return
|
||||||
|
self._completed_database_setup = setup_connection_for_dialect(
|
||||||
# We do not import sqlite3 here so mysql/other
|
self.engine.dialect.name, dbapi_connection
|
||||||
# users do not have to pay for it to be loaded in
|
)
|
||||||
# memory
|
|
||||||
if self.db_url.startswith(SQLITE_URL_PREFIX):
|
|
||||||
old_isolation = dbapi_connection.isolation_level
|
|
||||||
dbapi_connection.isolation_level = None
|
|
||||||
cursor = dbapi_connection.cursor()
|
|
||||||
cursor.execute("PRAGMA journal_mode=WAL")
|
|
||||||
cursor.close()
|
|
||||||
dbapi_connection.isolation_level = old_isolation
|
|
||||||
# WAL mode only needs to be setup once
|
|
||||||
# instead of every time we open the sqlite connection
|
|
||||||
# as its persistent and isn't free to call every time.
|
|
||||||
self._completed_database_setup = True
|
|
||||||
elif self.db_url.startswith("mysql"):
|
|
||||||
cursor = dbapi_connection.cursor()
|
|
||||||
cursor.execute("SET session wait_timeout=28800")
|
|
||||||
cursor.close()
|
|
||||||
|
|
||||||
if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url:
|
if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url:
|
||||||
kwargs["connect_args"] = {"check_same_thread": False}
|
kwargs["connect_args"] = {"check_same_thread": False}
|
||||||
@ -825,17 +810,9 @@ class Recorder(threading.Thread):
|
|||||||
def _setup_run(self):
|
def _setup_run(self):
|
||||||
"""Log the start of the current run."""
|
"""Log the start of the current run."""
|
||||||
with session_scope(session=self.get_session()) as session:
|
with session_scope(session=self.get_session()) as session:
|
||||||
for run in session.query(RecorderRuns).filter_by(end=None):
|
start = self.recording_start
|
||||||
run.closed_incorrect = True
|
end_incomplete_runs(session, start)
|
||||||
run.end = self.recording_start
|
self.run_info = RecorderRuns(start=start, created=dt_util.utcnow())
|
||||||
_LOGGER.warning(
|
|
||||||
"Ended unfinished session (id=%s from %s)", run.run_id, run.start
|
|
||||||
)
|
|
||||||
session.add(run)
|
|
||||||
|
|
||||||
self.run_info = RecorderRuns(
|
|
||||||
start=self.recording_start, created=dt_util.utcnow()
|
|
||||||
)
|
|
||||||
session.add(self.run_info)
|
session.add(self.run_info)
|
||||||
session.flush()
|
session.flush()
|
||||||
session.expunge(self.run_info)
|
session.expunge(self.run_info)
|
||||||
|
@ -22,6 +22,12 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Retry when one of the following MySQL errors occurred:
|
||||||
|
RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213)
|
||||||
|
# 1205: Lock wait timeout exceeded; try restarting transaction
|
||||||
|
# 1206: The total number of locks exceeds the lock table size
|
||||||
|
# 1213: Deadlock found when trying to get lock; try restarting transaction
|
||||||
|
|
||||||
|
|
||||||
def purge_old_data(
|
def purge_old_data(
|
||||||
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False
|
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False
|
||||||
@ -55,14 +61,9 @@ def purge_old_data(
|
|||||||
if repack:
|
if repack:
|
||||||
repack_database(instance)
|
repack_database(instance)
|
||||||
except OperationalError as err:
|
except OperationalError as err:
|
||||||
# Retry when one of the following MySQL errors occurred:
|
if (
|
||||||
# 1205: Lock wait timeout exceeded; try restarting transaction
|
instance.engine.dialect.name == "mysql"
|
||||||
# 1206: The total number of locks exceeds the lock table size
|
and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
|
||||||
# 1213: Deadlock found when trying to get lock; try restarting transaction
|
|
||||||
if instance.engine.driver in ("mysqldb", "pymysql") and err.orig.args[0] in (
|
|
||||||
1205,
|
|
||||||
1206,
|
|
||||||
1213,
|
|
||||||
):
|
):
|
||||||
_LOGGER.info("%s; purge not completed, retrying", err.orig.args[1])
|
_LOGGER.info("%s; purge not completed, retrying", err.orig.args[1])
|
||||||
time.sleep(instance.db_retry_wait)
|
time.sleep(instance.db_retry_wait)
|
||||||
|
@ -19,6 +19,7 @@ from .models import (
|
|||||||
ALL_TABLES,
|
ALL_TABLES,
|
||||||
TABLE_RECORDER_RUNS,
|
TABLE_RECORDER_RUNS,
|
||||||
TABLE_SCHEMA_CHANGES,
|
TABLE_SCHEMA_CHANGES,
|
||||||
|
RecorderRuns,
|
||||||
process_timestamp,
|
process_timestamp,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -230,3 +231,42 @@ def move_away_broken_database(dbfile: str) -> None:
|
|||||||
if not os.path.exists(path):
|
if not os.path.exists(path):
|
||||||
continue
|
continue
|
||||||
os.rename(path, f"{path}{corrupt_postfix}")
|
os.rename(path, f"{path}{corrupt_postfix}")
|
||||||
|
|
||||||
|
|
||||||
|
def execute_on_connection(dbapi_connection, statement):
|
||||||
|
"""Execute a single statement with a dbapi connection."""
|
||||||
|
cursor = dbapi_connection.cursor()
|
||||||
|
cursor.execute(statement)
|
||||||
|
cursor.close()
|
||||||
|
|
||||||
|
|
||||||
|
def setup_connection_for_dialect(dialect_name, dbapi_connection):
|
||||||
|
"""Execute statements needed for dialect connection."""
|
||||||
|
# Returns False if the the connection needs to be setup
|
||||||
|
# on the next connection, returns True if the connection
|
||||||
|
# never needs to be setup again.
|
||||||
|
if dialect_name == "sqlite":
|
||||||
|
old_isolation = dbapi_connection.isolation_level
|
||||||
|
dbapi_connection.isolation_level = None
|
||||||
|
execute_on_connection(dbapi_connection, "PRAGMA journal_mode=WAL")
|
||||||
|
dbapi_connection.isolation_level = old_isolation
|
||||||
|
# WAL mode only needs to be setup once
|
||||||
|
# instead of every time we open the sqlite connection
|
||||||
|
# as its persistent and isn't free to call every time.
|
||||||
|
return True
|
||||||
|
|
||||||
|
if dialect_name == "mysql":
|
||||||
|
execute_on_connection(dbapi_connection, "SET session wait_timeout=28800")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def end_incomplete_runs(session, start_time):
|
||||||
|
"""End any incomplete recorder runs."""
|
||||||
|
for run in session.query(RecorderRuns).filter_by(end=None):
|
||||||
|
run.closed_incorrect = True
|
||||||
|
run.end = start_time
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Ended unfinished session (id=%s from %s)", run.run_id, run.start
|
||||||
|
)
|
||||||
|
session.add(run)
|
||||||
|
@ -2,9 +2,9 @@
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
import json
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from unittest.mock import patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
from sqlalchemy.exc import DatabaseError
|
from sqlalchemy.exc import DatabaseError, OperationalError
|
||||||
from sqlalchemy.orm.session import Session
|
from sqlalchemy.orm.session import Session
|
||||||
|
|
||||||
from homeassistant.components import recorder
|
from homeassistant.components import recorder
|
||||||
@ -88,6 +88,67 @@ async def test_purge_old_states_encouters_database_corruption(
|
|||||||
assert states_after_purge.count() == 0
|
assert states_after_purge.count() == 0
|
||||||
|
|
||||||
|
|
||||||
|
async def test_purge_old_states_encounters_temporary_mysql_error(
|
||||||
|
hass: HomeAssistantType,
|
||||||
|
async_setup_recorder_instance: SetupRecorderInstanceT,
|
||||||
|
caplog,
|
||||||
|
):
|
||||||
|
"""Test retry on specific mysql operational errors."""
|
||||||
|
instance = await async_setup_recorder_instance(hass)
|
||||||
|
|
||||||
|
await _add_test_states(hass, instance)
|
||||||
|
await async_wait_recording_done_without_instance(hass)
|
||||||
|
|
||||||
|
mysql_exception = OperationalError("statement", {}, [])
|
||||||
|
mysql_exception.orig = MagicMock(args=(1205, "retryable"))
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.recorder.purge.time.sleep"
|
||||||
|
) as sleep_mock, patch(
|
||||||
|
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
||||||
|
side_effect=[mysql_exception, None],
|
||||||
|
), patch.object(
|
||||||
|
instance.engine.dialect, "name", "mysql"
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0}
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
await async_wait_recording_done_without_instance(hass)
|
||||||
|
await async_wait_recording_done_without_instance(hass)
|
||||||
|
|
||||||
|
assert "retrying" in caplog.text
|
||||||
|
assert sleep_mock.called
|
||||||
|
|
||||||
|
|
||||||
|
async def test_purge_old_states_encounters_operational_error(
|
||||||
|
hass: HomeAssistantType,
|
||||||
|
async_setup_recorder_instance: SetupRecorderInstanceT,
|
||||||
|
caplog,
|
||||||
|
):
|
||||||
|
"""Test error on operational errors that are not mysql does not retry."""
|
||||||
|
instance = await async_setup_recorder_instance(hass)
|
||||||
|
|
||||||
|
await _add_test_states(hass, instance)
|
||||||
|
await async_wait_recording_done_without_instance(hass)
|
||||||
|
|
||||||
|
exception = OperationalError("statement", {}, [])
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.recorder.purge._purge_old_recorder_runs",
|
||||||
|
side_effect=exception,
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0}
|
||||||
|
)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
await async_wait_recording_done_without_instance(hass)
|
||||||
|
await async_wait_recording_done_without_instance(hass)
|
||||||
|
|
||||||
|
assert "retrying" not in caplog.text
|
||||||
|
assert "Error purging history" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
async def test_purge_old_events(
|
async def test_purge_old_events(
|
||||||
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
|
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
|
||||||
):
|
):
|
||||||
|
@ -6,8 +6,10 @@ from unittest.mock import MagicMock, patch
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components.recorder import util
|
from homeassistant.components.recorder import run_information_with_session, util
|
||||||
from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX
|
from homeassistant.components.recorder.const import DATA_INSTANCE, SQLITE_URL_PREFIX
|
||||||
|
from homeassistant.components.recorder.models import RecorderRuns
|
||||||
|
from homeassistant.components.recorder.util import end_incomplete_runs, session_scope
|
||||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||||
from homeassistant.util import dt as dt_util
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
@ -37,6 +39,16 @@ def hass_recorder():
|
|||||||
hass.stop()
|
hass.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_scope_not_setup(hass_recorder):
|
||||||
|
"""Try to create a session scope when not setup."""
|
||||||
|
hass = hass_recorder()
|
||||||
|
with patch.object(
|
||||||
|
hass.data[DATA_INSTANCE], "get_session", return_value=None
|
||||||
|
), pytest.raises(RuntimeError):
|
||||||
|
with util.session_scope(hass=hass):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_recorder_bad_commit(hass_recorder):
|
def test_recorder_bad_commit(hass_recorder):
|
||||||
"""Bad _commit should retry 3 times."""
|
"""Bad _commit should retry 3 times."""
|
||||||
hass = hass_recorder()
|
hass = hass_recorder()
|
||||||
@ -130,6 +142,36 @@ async def test_last_run_was_recently_clean(hass):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup_connection_for_dialect_mysql():
|
||||||
|
"""Test setting up the connection for a mysql dialect."""
|
||||||
|
execute_mock = MagicMock()
|
||||||
|
close_mock = MagicMock()
|
||||||
|
|
||||||
|
def _make_cursor_mock(*_):
|
||||||
|
return MagicMock(execute=execute_mock, close=close_mock)
|
||||||
|
|
||||||
|
dbapi_connection = MagicMock(cursor=_make_cursor_mock)
|
||||||
|
|
||||||
|
assert util.setup_connection_for_dialect("mysql", dbapi_connection) is False
|
||||||
|
|
||||||
|
assert execute_mock.call_args[0][0] == "SET session wait_timeout=28800"
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup_connection_for_dialect_sqlite():
|
||||||
|
"""Test setting up the connection for a sqlite dialect."""
|
||||||
|
execute_mock = MagicMock()
|
||||||
|
close_mock = MagicMock()
|
||||||
|
|
||||||
|
def _make_cursor_mock(*_):
|
||||||
|
return MagicMock(execute=execute_mock, close=close_mock)
|
||||||
|
|
||||||
|
dbapi_connection = MagicMock(cursor=_make_cursor_mock)
|
||||||
|
|
||||||
|
assert util.setup_connection_for_dialect("sqlite", dbapi_connection) is True
|
||||||
|
|
||||||
|
assert execute_mock.call_args[0][0] == "PRAGMA journal_mode=WAL"
|
||||||
|
|
||||||
|
|
||||||
def test_basic_sanity_check(hass_recorder):
|
def test_basic_sanity_check(hass_recorder):
|
||||||
"""Test the basic sanity checks with a missing table."""
|
"""Test the basic sanity checks with a missing table."""
|
||||||
hass = hass_recorder()
|
hass = hass_recorder()
|
||||||
@ -194,3 +236,28 @@ def test_combined_checks(hass_recorder, caplog):
|
|||||||
caplog.clear()
|
caplog.clear()
|
||||||
with pytest.raises(sqlite3.DatabaseError):
|
with pytest.raises(sqlite3.DatabaseError):
|
||||||
util.run_checks_on_open_db("fake_db_path", cursor)
|
util.run_checks_on_open_db("fake_db_path", cursor)
|
||||||
|
|
||||||
|
|
||||||
|
def test_end_incomplete_runs(hass_recorder, caplog):
|
||||||
|
"""Ensure we can end incomplete runs."""
|
||||||
|
hass = hass_recorder()
|
||||||
|
|
||||||
|
with session_scope(hass=hass) as session:
|
||||||
|
run_info = run_information_with_session(session)
|
||||||
|
assert isinstance(run_info, RecorderRuns)
|
||||||
|
assert run_info.closed_incorrect is False
|
||||||
|
|
||||||
|
now = dt_util.utcnow()
|
||||||
|
now_without_tz = now.replace(tzinfo=None)
|
||||||
|
end_incomplete_runs(session, now)
|
||||||
|
run_info = run_information_with_session(session)
|
||||||
|
assert run_info.closed_incorrect is True
|
||||||
|
assert run_info.end == now_without_tz
|
||||||
|
session.flush()
|
||||||
|
|
||||||
|
later = dt_util.utcnow()
|
||||||
|
end_incomplete_runs(session, later)
|
||||||
|
run_info = run_information_with_session(session)
|
||||||
|
assert run_info.end == now_without_tz
|
||||||
|
|
||||||
|
assert "Ended unfinished session" in caplog.text
|
||||||
|
Loading…
x
Reference in New Issue
Block a user