diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 0bf10ca71c6..6c305242f5f 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -290,7 +290,7 @@ def _find_duplicates( ) .filter(subquery.c.is_duplicate == 1) .order_by(table.metadata_id, table.start, table.id.desc()) - .limit(MAX_ROWS_TO_PURGE) + .limit(1000 * MAX_ROWS_TO_PURGE) ) duplicates = execute(query) original_as_dict = {} @@ -343,12 +343,13 @@ def _delete_duplicates_from_table( if not duplicate_ids: break all_non_identical_duplicates.extend(non_identical_duplicates) - deleted_rows = ( - session.query(table) - .filter(table.id.in_(duplicate_ids)) - .delete(synchronize_session=False) - ) - total_deleted_rows += deleted_rows + for i in range(0, len(duplicate_ids), MAX_ROWS_TO_PURGE): + deleted_rows = ( + session.query(table) + .filter(table.id.in_(duplicate_ids[i : i + MAX_ROWS_TO_PURGE])) + .delete(synchronize_session=False) + ) + total_deleted_rows += deleted_rows return (total_deleted_rows, all_non_identical_duplicates) diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index 25590c712d9..c96465a671f 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -855,6 +855,179 @@ def test_delete_duplicates(caplog, tmpdir): assert "Found duplicated" not in caplog.text +def test_delete_duplicates_many(caplog, tmpdir): + """Test removal of duplicated statistics.""" + test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") + dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" + + module = "tests.components.recorder.models_schema_23" + importlib.import_module(module) + old_models = sys.modules[module] + + period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00")) + period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00")) + period3 = dt_util.as_utc(dt_util.parse_datetime("2021-10-01 00:00:00")) + period4 = dt_util.as_utc(dt_util.parse_datetime("2021-10-31 23:00:00")) + + external_energy_statistics_1 = ( + { + "start": period1, + "last_reset": None, + "state": 0, + "sum": 2, + }, + { + "start": period2, + "last_reset": None, + "state": 1, + "sum": 3, + }, + { + "start": period3, + "last_reset": None, + "state": 2, + "sum": 4, + }, + { + "start": period4, + "last_reset": None, + "state": 3, + "sum": 5, + }, + { + "start": period4, + "last_reset": None, + "state": 3, + "sum": 5, + }, + ) + external_energy_metadata_1 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_1", + "unit_of_measurement": "kWh", + } + external_energy_statistics_2 = ( + { + "start": period1, + "last_reset": None, + "state": 0, + "sum": 20, + }, + { + "start": period2, + "last_reset": None, + "state": 1, + "sum": 30, + }, + { + "start": period3, + "last_reset": None, + "state": 2, + "sum": 40, + }, + { + "start": period4, + "last_reset": None, + "state": 3, + "sum": 50, + }, + { + "start": period4, + "last_reset": None, + "state": 3, + "sum": 50, + }, + ) + external_energy_metadata_2 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_2", + "unit_of_measurement": "kWh", + } + external_co2_statistics = ( + { + "start": period1, + "last_reset": None, + "mean": 10, + }, + { + "start": period2, + "last_reset": None, + "mean": 30, + }, + { + "start": period3, + "last_reset": None, + "mean": 60, + }, + { + "start": period4, + "last_reset": None, + "mean": 90, + }, + ) + external_co2_metadata = { + "has_mean": True, + "has_sum": False, + "name": "Fossil percentage", + "source": "test", + "statistic_id": "test:fossil_percentage", + "unit_of_measurement": "%", + } + + # Create some duplicated statistics with schema version 23 + with patch.object(recorder, "models", old_models), patch.object( + recorder.migration, "SCHEMA_VERSION", old_models.SCHEMA_VERSION + ), patch( + "homeassistant.components.recorder.create_engine", new=_create_engine_test + ): + hass = get_test_home_assistant() + setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) + wait_recording_done(hass) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + session.add( + recorder.models.StatisticsMeta.from_meta(external_energy_metadata_1) + ) + session.add( + recorder.models.StatisticsMeta.from_meta(external_energy_metadata_2) + ) + session.add(recorder.models.StatisticsMeta.from_meta(external_co2_metadata)) + with session_scope(hass=hass) as session: + for stat in external_energy_statistics_1: + session.add(recorder.models.Statistics.from_stats(1, stat)) + for _ in range(3000): + session.add( + recorder.models.Statistics.from_stats( + 1, external_energy_statistics_1[-1] + ) + ) + for stat in external_energy_statistics_2: + session.add(recorder.models.Statistics.from_stats(2, stat)) + for stat in external_co2_statistics: + session.add(recorder.models.Statistics.from_stats(3, stat)) + + hass.stop() + + # Test that the duplicates are removed during migration from schema 23 + hass = get_test_home_assistant() + setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + hass.stop() + + assert "Deleted 3002 duplicated statistics rows" in caplog.text + assert "Found non identical" not in caplog.text + assert "Found duplicated" not in caplog.text + + @pytest.mark.freeze_time("2021-08-01 00:00:00+00:00") def test_delete_duplicates_non_identical(caplog, tmpdir): """Test removal of duplicated statistics."""