diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index a32af55720b..664fb59644f 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -925,7 +925,7 @@ class Recorder(threading.Thread): # that is pending before running the task if TYPE_CHECKING: assert isinstance(task, RecorderTask) - if not task.commit_before: + if task.commit_before: self._commit_event_session_or_retry() return task.run(self) except exc.DatabaseError as err: diff --git a/homeassistant/components/recorder/entity_registry.py b/homeassistant/components/recorder/entity_registry.py index cb464c7f543..89e6864cb06 100644 --- a/homeassistant/components/recorder/entity_registry.py +++ b/homeassistant/components/recorder/entity_registry.py @@ -7,7 +7,7 @@ from homeassistant.helpers import entity_registry as er from homeassistant.helpers.start import async_at_start from .core import Recorder -from .util import get_instance, session_scope +from .util import filter_unique_constraint_integrity_error, get_instance, session_scope _LOGGER = logging.getLogger(__name__) @@ -62,7 +62,10 @@ def update_states_metadata( ) return - with session_scope(session=instance.get_session()) as session: + with session_scope( + session=instance.get_session(), + exception_filter=filter_unique_constraint_integrity_error(instance, "state"), + ) as session: if not states_meta_manager.update_metadata(session, entity_id, new_entity_id): _LOGGER.warning( "Cannot migrate history for entity_id `%s` to `%s` " diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 2c26328fa2d..ffe3136f841 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -4,7 +4,6 @@ from __future__ import annotations from collections import defaultdict from collections.abc import Callable, Iterable, Sequence -import contextlib import dataclasses from datetime import datetime, timedelta from functools import lru_cache, partial @@ -16,7 +15,7 @@ from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast from sqlalchemy import Select, and_, bindparam, func, lambda_stmt, select, text from sqlalchemy.engine.row import Row -from sqlalchemy.exc import SQLAlchemyError, StatementError +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm.session import Session from sqlalchemy.sql.lambdas import StatementLambdaElement import voluptuous as vol @@ -73,6 +72,7 @@ from .models import ( from .util import ( execute, execute_stmt_lambda_element, + filter_unique_constraint_integrity_error, get_instance, retryable_database_job, session_scope, @@ -455,7 +455,9 @@ def compile_missing_statistics(instance: Recorder) -> bool: with session_scope( session=instance.get_session(), - exception_filter=_filter_unique_constraint_integrity_error(instance), + exception_filter=filter_unique_constraint_integrity_error( + instance, "statistic" + ), ) as session: # Find the newest statistics run, if any if last_run := session.query(func.max(StatisticsRuns.start)).scalar(): @@ -487,7 +489,9 @@ def compile_statistics(instance: Recorder, start: datetime, fire_events: bool) - # Return if we already have 5-minute statistics for the requested period with session_scope( session=instance.get_session(), - exception_filter=_filter_unique_constraint_integrity_error(instance), + exception_filter=filter_unique_constraint_integrity_error( + instance, "statistic" + ), ) as session: modified_statistic_ids = _compile_statistics( instance, session, start, fire_events @@ -738,7 +742,9 @@ def update_statistics_metadata( if new_statistic_id is not UNDEFINED and new_statistic_id is not None: with session_scope( session=instance.get_session(), - exception_filter=_filter_unique_constraint_integrity_error(instance), + exception_filter=filter_unique_constraint_integrity_error( + instance, "statistic" + ), ) as session: statistics_meta_manager.update_statistic_id( session, DOMAIN, statistic_id, new_statistic_id @@ -2247,54 +2253,6 @@ def async_add_external_statistics( _async_import_statistics(hass, metadata, statistics) -def _filter_unique_constraint_integrity_error( - instance: Recorder, -) -> Callable[[Exception], bool]: - def _filter_unique_constraint_integrity_error(err: Exception) -> bool: - """Handle unique constraint integrity errors.""" - if not isinstance(err, StatementError): - return False - - assert instance.engine is not None - dialect_name = instance.engine.dialect.name - - ignore = False - if ( - dialect_name == SupportedDialect.SQLITE - and "UNIQUE constraint failed" in str(err) - ): - ignore = True - if ( - dialect_name == SupportedDialect.POSTGRESQL - and err.orig - and hasattr(err.orig, "pgcode") - and err.orig.pgcode == "23505" - ): - ignore = True - if ( - dialect_name == SupportedDialect.MYSQL - and err.orig - and hasattr(err.orig, "args") - ): - with contextlib.suppress(TypeError): - if err.orig.args[0] == 1062: - ignore = True - - if ignore: - _LOGGER.warning( - ( - "Blocked attempt to insert duplicated statistic rows, please report" - " at %s" - ), - "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22", - exc_info=err, - ) - - return ignore - - return _filter_unique_constraint_integrity_error - - def _import_statistics_with_session( instance: Recorder, session: Session, @@ -2398,7 +2356,9 @@ def import_statistics( with session_scope( session=instance.get_session(), - exception_filter=_filter_unique_constraint_integrity_error(instance), + exception_filter=filter_unique_constraint_integrity_error( + instance, "statistic" + ), ) as session: return _import_statistics_with_session( instance, session, metadata, statistics, table diff --git a/homeassistant/components/recorder/table_managers/statistics_meta.py b/homeassistant/components/recorder/table_managers/statistics_meta.py index e216cb79987..32e989b0e3d 100644 --- a/homeassistant/components/recorder/table_managers/statistics_meta.py +++ b/homeassistant/components/recorder/table_managers/statistics_meta.py @@ -308,11 +308,18 @@ class StatisticsMetaManager: recorder thread. """ self._assert_in_recorder_thread() + if self.get(session, new_statistic_id): + _LOGGER.error( + "Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use", + old_statistic_id, + new_statistic_id, + ) + return session.query(StatisticsMeta).filter( (StatisticsMeta.statistic_id == old_statistic_id) & (StatisticsMeta.source == source) ).update({StatisticsMeta.statistic_id: new_statistic_id}) - self._clear_cache([old_statistic_id, new_statistic_id]) + self._clear_cache([old_statistic_id]) def delete(self, session: Session, statistic_ids: list[str]) -> None: """Clear statistics for a list of statistic_ids. diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index 67cf33ee591..0b9ec0cf68c 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Callable, Collection, Generator, Iterable, Sequence +import contextlib from contextlib import contextmanager from datetime import date, datetime, timedelta import functools @@ -22,7 +23,7 @@ import ciso8601 from sqlalchemy import inspect, text from sqlalchemy.engine import Result, Row from sqlalchemy.engine.interfaces import DBAPIConnection -from sqlalchemy.exc import OperationalError, SQLAlchemyError +from sqlalchemy.exc import OperationalError, SQLAlchemyError, StatementError from sqlalchemy.orm.query import Query from sqlalchemy.orm.session import Session from sqlalchemy.sql.lambdas import StatementLambdaElement @@ -907,3 +908,54 @@ def get_index_by_name(session: Session, table_name: str, index_name: str) -> str ), None, ) + + +def filter_unique_constraint_integrity_error( + instance: Recorder, row_type: str +) -> Callable[[Exception], bool]: + """Create a filter for unique constraint integrity errors.""" + + def _filter_unique_constraint_integrity_error(err: Exception) -> bool: + """Handle unique constraint integrity errors.""" + if not isinstance(err, StatementError): + return False + + assert instance.engine is not None + dialect_name = instance.engine.dialect.name + + ignore = False + if ( + dialect_name == SupportedDialect.SQLITE + and "UNIQUE constraint failed" in str(err) + ): + ignore = True + if ( + dialect_name == SupportedDialect.POSTGRESQL + and err.orig + and hasattr(err.orig, "pgcode") + and err.orig.pgcode == "23505" + ): + ignore = True + if ( + dialect_name == SupportedDialect.MYSQL + and err.orig + and hasattr(err.orig, "args") + ): + with contextlib.suppress(TypeError): + if err.orig.args[0] == 1062: + ignore = True + + if ignore: + _LOGGER.warning( + ( + "Blocked attempt to insert duplicated %s rows, please report" + " at %s" + ), + row_type, + "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22", + exc_info=err, + ) + + return ignore + + return _filter_unique_constraint_integrity_error diff --git a/tests/components/recorder/test_entity_registry.py b/tests/components/recorder/test_entity_registry.py index 22989b761dc..37223f206a1 100644 --- a/tests/components/recorder/test_entity_registry.py +++ b/tests/components/recorder/test_entity_registry.py @@ -1,11 +1,13 @@ """The tests for sensor recorder platform.""" from collections.abc import Callable +from unittest.mock import patch import pytest from sqlalchemy import select from sqlalchemy.orm import Session +from homeassistant.components import recorder from homeassistant.components.recorder import history from homeassistant.components.recorder.db_schema import StatesMeta from homeassistant.components.recorder.util import session_scope @@ -261,4 +263,101 @@ def test_rename_entity_collision( assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1 assert _count_entity_id_in_states_meta(hass, session, "sensor.test1") == 1 + # We should hit the safeguard in the states_meta_manager assert "the new entity_id is already in use" in caplog.text + + # We should not hit the safeguard in the entity_registry + assert "Blocked attempt to insert duplicated state rows" not in caplog.text + + +def test_rename_entity_collision_without_states_meta_safeguard( + hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture +) -> None: + """Test states meta is not migrated when there is a collision. + + This test disables the safeguard in the states_meta_manager + and relies on the filter_unique_constraint_integrity_error safeguard. + """ + hass = hass_recorder() + setup_component(hass, "sensor", {}) + + entity_reg = mock_registry(hass) + + @callback + def add_entry(): + reg_entry = entity_reg.async_get_or_create( + "sensor", + "test", + "unique_0000", + suggested_object_id="test1", + ) + assert reg_entry.entity_id == "sensor.test1" + + hass.add_job(add_entry) + hass.block_till_done() + + zero, four, states = record_states(hass) + hist = history.get_significant_states( + hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"}) + ) + assert_dict_of_states_equal_without_context_and_last_changed(states, hist) + assert len(hist["sensor.test1"]) == 3 + + hass.states.set("sensor.test99", "collision") + hass.states.remove("sensor.test99") + + hass.block_till_done() + wait_recording_done(hass) + + # Verify history before collision + hist = history.get_significant_states( + hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"}) + ) + assert len(hist["sensor.test1"]) == 3 + assert len(hist["sensor.test99"]) == 2 + + instance = recorder.get_instance(hass) + # Patch out the safeguard in the states meta manager + # so that we hit the filter_unique_constraint_integrity_error safeguard in the entity_registry + with patch.object(instance.states_meta_manager, "get", return_value=None): + # Rename entity sensor.test1 to sensor.test99 + @callback + def rename_entry(): + entity_reg.async_update_entity( + "sensor.test1", new_entity_id="sensor.test99" + ) + + hass.add_job(rename_entry) + wait_recording_done(hass) + + # History is not migrated on collision + hist = history.get_significant_states( + hass, zero, four, list(set(states) | {"sensor.test99", "sensor.test1"}) + ) + assert len(hist["sensor.test1"]) == 3 + assert len(hist["sensor.test99"]) == 2 + + with session_scope(hass=hass) as session: + assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1 + + hass.states.set("sensor.test99", "post_migrate") + wait_recording_done(hass) + + new_hist = history.get_significant_states( + hass, + zero, + dt_util.utcnow(), + list(set(states) | {"sensor.test99", "sensor.test1"}), + ) + assert new_hist["sensor.test99"][-1].state == "post_migrate" + assert len(hist["sensor.test99"]) == 2 + + with session_scope(hass=hass) as session: + assert _count_entity_id_in_states_meta(hass, session, "sensor.test99") == 1 + assert _count_entity_id_in_states_meta(hass, session, "sensor.test1") == 1 + + # We should not hit the safeguard in the states_meta_manager + assert "the new entity_id is already in use" not in caplog.text + + # We should hit the safeguard in the entity_registry + assert "Blocked attempt to insert duplicated state rows" in caplog.text diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index acfe6189af9..96814da5171 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -2490,3 +2490,73 @@ async def test_events_are_recorded_until_final_write( await hass.async_block_till_done() assert not instance.engine + + +async def test_commit_before_commits_pending_writes( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + recorder_db_url: str, + tmp_path: Path, +) -> None: + """Test commit_before with a non-zero commit interval. + + All of our test run with a commit interval of 0 by + default, so we need to test this with a non-zero commit + """ + config = { + recorder.CONF_DB_URL: recorder_db_url, + recorder.CONF_COMMIT_INTERVAL: 60, + } + + recorder_helper.async_initialize_recorder(hass) + hass.create_task(async_setup_recorder_instance(hass, config)) + await recorder_helper.async_wait_recorder(hass) + instance = get_instance(hass) + assert instance.commit_interval == 60 + verify_states_in_queue_future = hass.loop.create_future() + verify_session_commit_future = hass.loop.create_future() + + class VerifyCommitBeforeTask(recorder.tasks.RecorderTask): + """Task to verify that commit before ran. + + If commit_before is true, we should have no pending writes. + """ + + commit_before = True + + def run(self, instance: Recorder) -> None: + if not instance._event_session_has_pending_writes: + hass.loop.call_soon_threadsafe( + verify_session_commit_future.set_result, None + ) + return + hass.loop.call_soon_threadsafe( + verify_session_commit_future.set_exception, + RuntimeError("Session still has pending write"), + ) + + class VerifyStatesInQueueTask(recorder.tasks.RecorderTask): + """Task to verify that states are in the queue.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + if instance._event_session_has_pending_writes: + hass.loop.call_soon_threadsafe( + verify_states_in_queue_future.set_result, None + ) + return + hass.loop.call_soon_threadsafe( + verify_states_in_queue_future.set_exception, + RuntimeError("Session has no pending write"), + ) + + # First insert an event + instance.queue_task(Event("fake_event")) + # Next verify that the event session has pending writes + instance.queue_task(VerifyStatesInQueueTask()) + # Finally, verify that the session was committed + instance.queue_task(VerifyCommitBeforeTask()) + + await verify_states_in_queue_future + await verify_session_commit_future diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index 548a9d17502..2b320cffccc 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -454,7 +454,11 @@ def test_statistics_during_period_set_back_compat( def test_rename_entity_collision( hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture ) -> None: - """Test statistics is migrated when entity_id is changed.""" + """Test statistics is migrated when entity_id is changed. + + This test relies on the the safeguard in the statistics_meta_manager + and should not hit the filter_unique_constraint_integrity_error safeguard. + """ hass = hass_recorder() setup_component(hass, "sensor", {}) @@ -531,8 +535,117 @@ def test_rename_entity_collision( # Statistics failed to migrate due to the collision stats = statistics_during_period(hass, zero, period="5minute") assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2} + + # Verify the safeguard in the states meta manager was hit + assert ( + "Cannot rename statistic_id `sensor.test1` to `sensor.test99` " + "because the new statistic_id is already in use" + ) in caplog.text + + # Verify the filter_unique_constraint_integrity_error safeguard was not hit + assert "Blocked attempt to insert duplicated statistic rows" not in caplog.text + + +def test_rename_entity_collision_states_meta_check_disabled( + hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture +) -> None: + """Test statistics is migrated when entity_id is changed. + + This test disables the safeguard in the statistics_meta_manager + and relies on the filter_unique_constraint_integrity_error safeguard. + """ + hass = hass_recorder() + setup_component(hass, "sensor", {}) + + entity_reg = mock_registry(hass) + + @callback + def add_entry(): + reg_entry = entity_reg.async_get_or_create( + "sensor", + "test", + "unique_0000", + suggested_object_id="test1", + ) + assert reg_entry.entity_id == "sensor.test1" + + hass.add_job(add_entry) + hass.block_till_done() + + zero, four, states = record_states(hass) + hist = history.get_significant_states(hass, zero, four, list(states)) + assert_dict_of_states_equal_without_context_and_last_changed(states, hist) + + for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}): + stats = statistics_during_period(hass, zero, period="5minute", **kwargs) + assert stats == {} + stats = get_last_short_term_statistics( + hass, + 0, + "sensor.test1", + True, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) + assert stats == {} + + do_adhoc_statistics(hass, start=zero) + wait_recording_done(hass) + expected_1 = { + "start": process_timestamp(zero).timestamp(), + "end": process_timestamp(zero + timedelta(minutes=5)).timestamp(), + "mean": pytest.approx(14.915254237288135), + "min": pytest.approx(10.0), + "max": pytest.approx(20.0), + "last_reset": None, + "state": None, + "sum": None, + } + expected_stats1 = [expected_1] + expected_stats2 = [expected_1] + + stats = statistics_during_period(hass, zero, period="5minute") + assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2} + + # Insert metadata for sensor.test99 + metadata_1 = { + "has_mean": True, + "has_sum": False, + "name": "Total imported energy", + "source": "test", + "statistic_id": "sensor.test99", + "unit_of_measurement": "kWh", + } + + with session_scope(hass=hass) as session: + session.add(recorder.db_schema.StatisticsMeta.from_meta(metadata_1)) + + instance = recorder.get_instance(hass) + # Patch out the safeguard in the states meta manager + # so that we hit the filter_unique_constraint_integrity_error safeguard in the statistics + with patch.object(instance.statistics_meta_manager, "get", return_value=None): + # Rename entity sensor.test1 to sensor.test99 + @callback + def rename_entry(): + entity_reg.async_update_entity( + "sensor.test1", new_entity_id="sensor.test99" + ) + + hass.add_job(rename_entry) + wait_recording_done(hass) + + # Statistics failed to migrate due to the collision + stats = statistics_during_period(hass, zero, period="5minute") + assert stats == {"sensor.test1": expected_stats1, "sensor.test2": expected_stats2} + + # Verify the filter_unique_constraint_integrity_error safeguard was hit assert "Blocked attempt to insert duplicated statistic rows" in caplog.text + # Verify the safeguard in the states meta manager was not hit + assert ( + "Cannot rename statistic_id `sensor.test1` to `sensor.test99` " + "because the new statistic_id is already in use" + ) not in caplog.text + def test_statistics_duplicated( hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture