Add recorder test fixture for skipping tests by DB engine (#121118)

* Add recorder test fixture for skipping tests by DB engine

* Fix mistake
This commit is contained in:
Erik Montnemery 2024-07-04 00:02:46 +02:00 committed by GitHub
parent 73716ea529
commit 408e524551
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 185 additions and 121 deletions

View File

@ -40,6 +40,8 @@ async def test_validate_db_schema(
assert "Database is about to correct DB schema errors" not in caplog.text
@pytest.mark.skip_on_db_engine(["postgresql", "sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_fix_utf8_issue_good_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -47,9 +49,6 @@ async def test_validate_db_schema_fix_utf8_issue_good_schema(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the schema is correct."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -59,6 +58,8 @@ async def test_validate_db_schema_fix_utf8_issue_good_schema(
assert schema_errors == set()
@pytest.mark.skip_on_db_engine(["postgresql", "sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_fix_utf8_issue_with_broken_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -66,9 +67,6 @@ async def test_validate_db_schema_fix_utf8_issue_with_broken_schema(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the schema is broken and repairing it."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -102,6 +100,8 @@ async def test_validate_db_schema_fix_utf8_issue_with_broken_schema(
assert schema_errors == set()
@pytest.mark.skip_on_db_engine(["postgresql", "sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_fix_incorrect_collation(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -109,9 +109,6 @@ async def test_validate_db_schema_fix_incorrect_collation(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the collation is incorrect."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -144,6 +141,8 @@ async def test_validate_db_schema_fix_incorrect_collation(
assert schema_errors == set()
@pytest.mark.skip_on_db_engine(["postgresql", "sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_precision_correct_collation(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -151,9 +150,6 @@ async def test_validate_db_schema_precision_correct_collation(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is correct with the correct collation."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -165,6 +161,8 @@ async def test_validate_db_schema_precision_correct_collation(
assert schema_errors == set()
@pytest.mark.skip_on_db_engine(["postgresql", "sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_fix_utf8_issue_with_broken_schema_unrepairable(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -172,9 +170,6 @@ async def test_validate_db_schema_fix_utf8_issue_with_broken_schema_unrepairable
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL when the schema is broken and cannot be repaired."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -206,6 +201,8 @@ async def test_validate_db_schema_fix_utf8_issue_with_broken_schema_unrepairable
assert "Error when validating DB schema" in caplog.text
@pytest.mark.skip_on_db_engine(["sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_precision_good_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -213,9 +210,6 @@ async def test_validate_db_schema_precision_good_schema(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is correct."""
if not recorder_db_url.startswith(("mysql://", "postgresql://")):
# This problem only happens on MySQL and PostgreSQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -227,6 +221,8 @@ async def test_validate_db_schema_precision_good_schema(
assert schema_errors == set()
@pytest.mark.skip_on_db_engine(["sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_precision_with_broken_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -234,9 +230,6 @@ async def test_validate_db_schema_precision_with_broken_schema(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is broken and than repair it."""
if not recorder_db_url.startswith(("mysql://", "postgresql://")):
# This problem only happens on MySQL and PostgreSQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)
@ -275,6 +268,8 @@ async def test_validate_db_schema_precision_with_broken_schema(
assert schema_errors == set()
@pytest.mark.skip_on_db_engine(["postgresql", "sqlite"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_validate_db_schema_precision_with_unrepairable_broken_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
@ -282,9 +277,6 @@ async def test_validate_db_schema_precision_with_unrepairable_broken_schema(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema when the schema is broken and cannot be repaired."""
if not recorder_db_url.startswith("mysql://"):
# This problem only happens on MySQL
return
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
instance = get_instance(hass)

View File

@ -9,6 +9,30 @@ from homeassistant.components import recorder
from homeassistant.core import HomeAssistant
def pytest_configure(config):
"""Add custom skip_on_db_engine marker."""
config.addinivalue_line(
"markers",
"skip_on_db_engine(engine): mark test to run only on named DB engine(s)",
)
@pytest.fixture
def skip_by_db_engine(request: pytest.FixtureRequest, recorder_db_url: str) -> None:
"""Fixture to skip tests on unsupported DB engines.
Mark the test with @pytest.mark.skip_on_db_engine("mysql") to skip on mysql, or
@pytest.mark.skip_on_db_engine(["mysql", "sqlite"]) to skip on mysql and sqlite.
"""
if request.node.get_closest_marker("skip_on_db_engine"):
skip_on_db_engine = request.node.get_closest_marker("skip_on_db_engine").args[0]
if isinstance(skip_on_db_engine, str):
skip_on_db_engine = [skip_on_db_engine]
db_engine = recorder_db_url.partition("://")[0]
if db_engine in skip_on_db_engine:
pytest.skip(f"skipped for DB engine: {db_engine}")
@pytest.fixture
def recorder_dialect_name(hass: HomeAssistant, db_engine: str) -> Generator[None]:
"""Patch the recorder dialect."""

View File

@ -891,14 +891,17 @@ def record_states(
return zero, four, states
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_state_changes_during_period_query_during_migration_to_schema_25(
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test we can query data prior to schema 25 and during migration to schema 25."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop table state_attributes
return
"""Test we can query data prior to schema 25 and during migration to schema 25.
This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop the
state_attributes table.
"""
instance = recorder.get_instance(hass)
@ -957,14 +960,17 @@ async def test_state_changes_during_period_query_during_migration_to_schema_25(
assert state.attributes == {"name": "the light"}
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_get_states_query_during_migration_to_schema_25(
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test we can query data prior to schema 25 and during migration to schema 25."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop table state_attributes
return
"""Test we can query data prior to schema 25 and during migration to schema 25.
This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop the
state_attributes table.
"""
instance = recorder.get_instance(hass)
@ -1007,14 +1013,17 @@ async def test_get_states_query_during_migration_to_schema_25(
assert state.attributes == {"name": "the light"}
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_get_states_query_during_migration_to_schema_25_multiple_entities(
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test we can query data prior to schema 25 and during migration to schema 25."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop table state_attributes
return
"""Test we can query data prior to schema 25 and during migration to schema 25.
This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop the
state_attributes table.
"""
instance = recorder.get_instance(hass)

View File

@ -893,14 +893,17 @@ def record_states(
return zero, four, states
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_state_changes_during_period_query_during_migration_to_schema_25(
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test we can query data prior to schema 25 and during migration to schema 25."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop table state_attributes
return
"""Test we can query data prior to schema 25 and during migration to schema 25.
This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop the
state_attributes table.
"""
instance = recorder.get_instance(hass)
@ -959,14 +962,17 @@ async def test_state_changes_during_period_query_during_migration_to_schema_25(
assert state.attributes == {"name": "the light"}
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_get_states_query_during_migration_to_schema_25(
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test we can query data prior to schema 25 and during migration to schema 25."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop table state_attributes
return
"""Test we can query data prior to schema 25 and during migration to schema 25.
This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop the
state_attributes table.
"""
instance = recorder.get_instance(hass)
@ -1009,14 +1015,17 @@ async def test_get_states_query_during_migration_to_schema_25(
assert state.attributes == {"name": "the light"}
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_get_states_query_during_migration_to_schema_25_multiple_entities(
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test we can query data prior to schema 25 and during migration to schema 25."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop table state_attributes
return
"""Test we can query data prior to schema 25 and during migration to schema 25.
This test doesn't run on MySQL / MariaDB / Postgresql; we can't drop the
state_attributes table.
"""
instance = recorder.get_instance(hass)

View File

@ -1815,16 +1815,18 @@ async def test_entity_id_filter(
assert len(db_events) == idx + 1, data
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_lock_and_unlock(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
recorder_db_url: str,
tmp_path: Path,
) -> None:
"""Test writing events during lock getting written after unlocking."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# Database locking is only used for SQLite
return
"""Test writing events during lock getting written after unlocking.
This test is specific for SQLite: Locking is not implemented for other engines.
"""
if recorder_db_url == "sqlite://":
# Use file DB, in memory DB cannot do write locks.
@ -1869,6 +1871,8 @@ async def test_database_lock_and_unlock(
assert len(db_events) == 1
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_lock_and_overflow(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
@ -1877,10 +1881,10 @@ async def test_database_lock_and_overflow(
caplog: pytest.LogCaptureFixture,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test writing events during lock leading to overflow the queue causes the database to unlock."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# Database locking is only used for SQLite
return pytest.skip("Database locking is only used for SQLite")
"""Test writing events during lock leading to overflow the queue causes the database to unlock.
This test is specific for SQLite: Locking is not implemented for other engines.
"""
# Use file DB, in memory DB cannot do write locks.
if recorder_db_url == "sqlite://":
@ -1935,6 +1939,8 @@ async def test_database_lock_and_overflow(
assert start_time.count(":") == 2
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_lock_and_overflow_checks_available_memory(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
@ -1943,9 +1949,10 @@ async def test_database_lock_and_overflow_checks_available_memory(
caplog: pytest.LogCaptureFixture,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test writing events during lock leading to overflow the queue causes the database to unlock."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
return pytest.skip("Database locking is only used for SQLite")
"""Test writing events during lock leading to overflow the queue causes the database to unlock.
This test is specific for SQLite: Locking is not implemented for other engines.
"""
# Use file DB, in memory DB cannot do write locks.
if recorder_db_url == "sqlite://":
@ -2025,13 +2032,15 @@ async def test_database_lock_and_overflow_checks_available_memory(
assert start_time.count(":") == 2
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_lock_timeout(
hass: HomeAssistant, setup_recorder: None, recorder_db_url: str
) -> None:
"""Test locking database timeout when recorder stopped."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite: Locking is not implemented for other engines
return
"""Test locking database timeout when recorder stopped.
This test is specific for SQLite: Locking is not implemented for other engines.
"""
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
@ -2099,16 +2108,18 @@ async def test_database_connection_keep_alive(
assert "Sending keepalive" in caplog.text
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_connection_keep_alive_disabled_on_sqlite(
hass: HomeAssistant,
async_setup_recorder_instance: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture,
recorder_db_url: str,
) -> None:
"""Test we do not do keep alive for sqlite."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite, keepalive runs on other engines
return
"""Test we do not do keep alive for sqlite.
This test is specific for SQLite, keepalive runs on other engines.
"""
instance = await async_setup_recorder_instance(hass)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)

View File

@ -154,16 +154,18 @@ async def test_database_migration_failed(
assert len(mock_dismiss.mock_calls) == 1
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_database_migration_encounters_corruption(
hass: HomeAssistant,
recorder_db_url: str,
async_setup_recorder_instance: RecorderInstanceGenerator,
) -> None:
"""Test we move away the database if its corrupt."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite, wiping the database on error only happens
# with SQLite.
return
"""Test we move away the database if its corrupt.
This test is specific for SQLite, wiping the database on error only happens
with SQLite.
"""
assert recorder.util.async_migration_in_progress(hass) is False
@ -610,12 +612,13 @@ def test_raise_if_exception_missing_empty_cause_str() -> None:
migration.raise_if_exception_missing_str(programming_exc, ["not present"])
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
def test_rebuild_sqlite_states_table(recorder_db_url: str) -> None:
"""Test that we can rebuild the states table in SQLite."""
if not recorder_db_url.startswith("sqlite://"):
# This test is specific for SQLite
return
"""Test that we can rebuild the states table in SQLite.
This test is specific for SQLite.
"""
engine = create_engine(recorder_db_url)
session_maker = scoped_session(sessionmaker(bind=engine, future=True))
with session_scope(session=session_maker()) as session:
@ -633,14 +636,15 @@ def test_rebuild_sqlite_states_table(recorder_db_url: str) -> None:
engine.dispose()
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
def test_rebuild_sqlite_states_table_missing_fails(
recorder_db_url: str, caplog: pytest.LogCaptureFixture
) -> None:
"""Test handling missing states table when attempting rebuild."""
if not recorder_db_url.startswith("sqlite://"):
# This test is specific for SQLite
return
"""Test handling missing states table when attempting rebuild.
This test is specific for SQLite.
"""
engine = create_engine(recorder_db_url)
session_maker = scoped_session(sessionmaker(bind=engine, future=True))
with session_scope(session=session_maker()) as session:
@ -667,14 +671,15 @@ def test_rebuild_sqlite_states_table_missing_fails(
engine.dispose()
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
def test_rebuild_sqlite_states_table_extra_columns(
recorder_db_url: str, caplog: pytest.LogCaptureFixture
) -> None:
"""Test handling extra columns when rebuilding the states table."""
if not recorder_db_url.startswith("sqlite://"):
# This test is specific for SQLite
return
"""Test handling extra columns when rebuilding the states table.
This test is specific for SQLite.
"""
engine = create_engine(recorder_db_url)
session_maker = scoped_session(sessionmaker(bind=engine, future=True))
with session_scope(session=session_maker()) as session:

View File

@ -62,6 +62,8 @@ def _create_engine_test(*args, **kwargs):
return engine
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
async def test_migration_changes_prevent_trying_to_migrate_again(
async_setup_recorder_instance: RecorderInstanceGenerator,
@ -75,11 +77,9 @@ async def test_migration_changes_prevent_trying_to_migrate_again(
1. With schema 32 to populate the data
2. With current schema so the migration happens
3. With current schema to verify we do not have to query to see if the migration is done
This test uses a test database between runs so its SQLite specific. WHY, this makes no sense.???
"""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test uses a test database between runs so its
# SQLite specific
return
config = {
recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db"),

View File

@ -193,16 +193,18 @@ async def test_purge_old_states(
assert state_attributes.count() == 3
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_purge_old_states_encouters_database_corruption(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test database image image is malformed while deleting old states."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite, wiping the database on error only happens
# with SQLite.
return
"""Test database image image is malformed while deleting old states.
This test is specific for SQLite, wiping the database on error only happens
with SQLite.
"""
await async_setup_recorder_instance(hass)

View File

@ -155,16 +155,18 @@ async def test_purge_old_states(
assert state_attributes.count() == 3
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_purge_old_states_encouters_database_corruption(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test database image image is malformed while deleting old states."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite, wiping the database on error only happens
# with SQLite.
return
"""Test database image image is malformed while deleting old states.
This test is specific for SQLite, wiping the database on error only happens
with SQLite.
"""
await async_setup_recorder_instance(hass)
await async_attach_db_engine(hass)
@ -1097,14 +1099,17 @@ async def test_purge_can_mix_legacy_and_new_format(
assert states_without_event_id.count() == 1
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_purge_can_mix_legacy_and_new_format_with_detached_state(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test purging with legacy and new events with a detached state."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
return pytest.skip("This tests disables foreign key checks on SQLite")
"""Test purging with legacy and new events with a detached state.
This tests disables foreign key checks on SQLite.
"""
instance = await async_setup_recorder_instance(hass)
await async_attach_db_engine(hass)

View File

@ -15,13 +15,15 @@ from tests.common import get_system_health_info
from tests.typing import RecorderInstanceGenerator
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_recorder_system_health(
recorder_mock: Recorder, hass: HomeAssistant, recorder_db_url: str
) -> None:
"""Test recorder system health."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite
return
"""Test recorder system health.
This test is specific for SQLite.
"""
assert await async_setup_component(hass, "system_health", {})
await async_wait_recording_done(hass)
@ -100,15 +102,17 @@ async def test_recorder_system_health_db_url_missing_host(
}
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_recorder_system_health_crashed_recorder_runs_table(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
recorder_db_url: str,
) -> None:
"""Test recorder system health with crashed recorder runs table."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite
return
"""Test recorder system health with crashed recorder runs table.
This test is specific for SQLite.
"""
with patch(
"homeassistant.components.recorder.table_managers.recorder_runs.RecorderRunsManager.load_from_db"

View File

@ -719,14 +719,15 @@ async def test_no_issue_for_mariadb_with_MDEV_25020(
assert database_engine.optimizer.slow_range_in_select is False
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_basic_sanity_check(
hass: HomeAssistant, setup_recorder: None, recorder_db_url: str
) -> None:
"""Test the basic sanity checks with a missing table."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite
return
"""Test the basic sanity checks with a missing table.
This test is specific for SQLite.
"""
cursor = util.get_instance(hass).engine.raw_connection().cursor()
assert util.basic_sanity_check(cursor) is True
@ -737,17 +738,18 @@ async def test_basic_sanity_check(
util.basic_sanity_check(cursor)
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_combined_checks(
hass: HomeAssistant,
setup_recorder: None,
caplog: pytest.LogCaptureFixture,
recorder_db_url: str,
) -> None:
"""Run Checks on the open database."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite
return
"""Run Checks on the open database.
This test is specific for SQLite.
"""
instance = util.get_instance(hass)
instance.db_retry_wait = 0
@ -829,14 +831,15 @@ async def test_end_incomplete_runs(
assert "Ended unfinished session" in caplog.text
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
@pytest.mark.usefixtures("skip_by_db_engine")
async def test_periodic_db_cleanups(
hass: HomeAssistant, setup_recorder: None, recorder_db_url: str
) -> None:
"""Test periodic db cleanups."""
if recorder_db_url.startswith(("mysql://", "postgresql://")):
# This test is specific for SQLite
return
"""Test periodic db cleanups.
This test is specific for SQLite.
"""
with patch.object(util.get_instance(hass).engine, "connect") as connect_mock:
util.periodic_db_cleanups(util.get_instance(hass))