From 5b8591ec7e1b391396bf072477de9cee62ba135b Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 23 Feb 2024 21:46:00 -1000 Subject: [PATCH] Avoid reschedule churn in Storage.async_delay_save (#111091) * Avoid circular import in Storage.async_delay_save We call Storage.async_delay_save for every entity being added or removed from the registry. The late import took more time than everything else in the function. * Avoid reschedule churn in Storage.async_delay_save When we are adding or removing entities we will call async_delay_save quite often which has to add and remove a TimerHandle on the event loop which can add up when there are a lot of registry items changing. If the timer handle still has 80% of the time remaining on it we will avoid resceduling and let it fire at the time the original async_delay_save call was made. This ensures we do not force the event loop to rebuild its heapq because too many timer handlers were cancelled at once * div0 * add coverage for 0 since we had none * fix bad conflict * tweaks * tweaks * tweaks * tweaks * tweaks * tweaks * more test fixes * mqtt tests rely on event loop overhead --- homeassistant/helpers/storage.py | 30 +++++-- tests/components/frontend/test_init.py | 11 ++- tests/components/mqtt/test_init.py | 4 + tests/helpers/test_storage.py | 110 ++++++++++++++++++++++++- tests/test_config_entries.py | 12 +-- 5 files changed, 149 insertions(+), 18 deletions(-) diff --git a/homeassistant/helpers/storage.py b/homeassistant/helpers/storage.py index 4b88ea86dea..9817f30f736 100644 --- a/homeassistant/helpers/storage.py +++ b/homeassistant/helpers/storage.py @@ -42,6 +42,7 @@ _LOGGER = logging.getLogger(__name__) STORAGE_SEMAPHORE = "storage_semaphore" + _T = TypeVar("_T", bound=Mapping[str, Any] | Sequence[Any]) @@ -108,13 +109,14 @@ class Store(Generic[_T]): self.hass = hass self._private = private self._data: dict[str, Any] | None = None - self._unsub_delay_listener: asyncio.TimerHandle | None = None + self._delay_handle: asyncio.TimerHandle | None = None self._unsub_final_write_listener: CALLBACK_TYPE | None = None self._write_lock = asyncio.Lock() self._load_task: asyncio.Future[_T | None] | None = None self._encoder = encoder self._atomic_writes = atomic_writes self._read_only = read_only + self._next_write_time = 0.0 @cached_property def path(self): @@ -286,6 +288,11 @@ class Store(Generic[_T]): "data_func": data_func, } + next_when = self.hass.loop.time() + delay + if self._delay_handle and self._delay_handle.when() < next_when: + self._next_write_time = next_when + return + self._async_cleanup_delay_listener() self._async_ensure_final_write_listener() @@ -293,13 +300,24 @@ class Store(Generic[_T]): return # We use call_later directly here to avoid a circular import - self._unsub_delay_listener = self.hass.loop.call_later( - delay, self._async_schedule_callback_delayed_write + self._async_reschedule_delayed_write(next_when) + + @callback + def _async_reschedule_delayed_write(self, when: float) -> None: + """Reschedule a delayed write.""" + self._delay_handle = self.hass.loop.call_at( + when, self._async_schedule_callback_delayed_write ) @callback def _async_schedule_callback_delayed_write(self) -> None: """Schedule the delayed write in a task.""" + if self.hass.loop.time() < self._next_write_time: + # Timer fired too early because there were multiple + # calls to async_delay_save before the first one + # wrote. Reschedule the timer to the next write time. + self._async_reschedule_delayed_write(self._next_write_time) + return self.hass.async_create_task(self._async_callback_delayed_write()) @callback @@ -320,9 +338,9 @@ class Store(Generic[_T]): @callback def _async_cleanup_delay_listener(self) -> None: """Clean up a delay listener.""" - if self._unsub_delay_listener is not None: - self._unsub_delay_listener.cancel() - self._unsub_delay_listener = None + if self._delay_handle is not None: + self._delay_handle.cancel() + self._delay_handle = None async def _async_callback_delayed_write(self) -> None: """Handle a delayed write callback.""" diff --git a/tests/components/frontend/test_init.py b/tests/components/frontend/test_init.py index 274d916f10d..f04d4a9bc52 100644 --- a/tests/components/frontend/test_init.py +++ b/tests/components/frontend/test_init.py @@ -1,10 +1,10 @@ """The tests for Home Assistant frontend.""" -from datetime import timedelta from http import HTTPStatus import re from typing import Any from unittest.mock import patch +from freezegun.api import FrozenDateTimeFactory import pytest from homeassistant.components.frontend import ( @@ -20,7 +20,6 @@ from homeassistant.components.websocket_api.const import TYPE_RESULT from homeassistant.core import HomeAssistant from homeassistant.loader import async_get_integration from homeassistant.setup import async_setup_component -from homeassistant.util import dt as dt_util from tests.common import MockUser, async_capture_events, async_fire_time_changed from tests.typing import MockHAClientWebSocket, WebSocketGenerator @@ -220,7 +219,10 @@ async def test_themes_persist( async def test_themes_save_storage( - hass: HomeAssistant, hass_storage: dict[str, Any], frontend_themes + hass: HomeAssistant, + hass_storage: dict[str, Any], + freezer: FrozenDateTimeFactory, + frontend_themes, ) -> None: """Test that theme settings are restores after restart.""" @@ -233,7 +235,8 @@ async def test_themes_save_storage( ) # To trigger the call_later - async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=60)) + freezer.tick(60.0) + async_fire_time_changed(hass) # To execute the save await hass.async_block_till_done() diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index 35e27d44740..95ac94751ad 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -1400,6 +1400,8 @@ async def test_replaying_payload_same_topic( hass, "test/state", "online", qos=0, retain=True ) # Simulate a (retained) message played back await hass.async_block_till_done() + await hass.async_block_till_done() + assert len(calls_a) == 1 mqtt_client_mock.subscribe.assert_called() calls_a = [] @@ -1498,6 +1500,7 @@ async def test_replaying_payload_after_resubscribing( await hass.async_block_till_done() async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) await hass.async_block_till_done() + await hass.async_block_till_done() mqtt_client_mock.subscribe.assert_called() # Simulate a (retained) message played back @@ -1638,6 +1641,7 @@ async def test_not_calling_unsubscribe_with_active_subscribers( await hass.async_block_till_done() async_fire_time_changed(hass, utcnow() + timedelta(seconds=3)) # cooldown await hass.async_block_till_done() + await hass.async_block_till_done() assert mqtt_client_mock.subscribe.called unsub() diff --git a/tests/helpers/test_storage.py b/tests/helpers/test_storage.py index ab1889eccea..5382e18b15e 100644 --- a/tests/helpers/test_storage.py +++ b/tests/helpers/test_storage.py @@ -6,6 +6,7 @@ import os from typing import Any, NamedTuple from unittest.mock import Mock, patch +from freezegun.api import FrozenDateTimeFactory import py import pytest @@ -19,7 +20,11 @@ from homeassistant.helpers import issue_registry as ir, storage from homeassistant.util import dt as dt_util from homeassistant.util.color import RGBColor -from tests.common import async_fire_time_changed, async_test_home_assistant +from tests.common import ( + async_fire_time_changed, + async_fire_time_changed_exact, + async_test_home_assistant, +) MOCK_VERSION = 1 MOCK_VERSION_2 = 2 @@ -115,7 +120,7 @@ async def test_loading_parallel( async def test_saving_with_delay( - hass: HomeAssistant, store, hass_storage: dict[str, Any] + hass: HomeAssistant, store: storage.Store, hass_storage: dict[str, Any] ) -> None: """Test saving data after a delay.""" store.async_delay_save(lambda: MOCK_DATA, 1) @@ -131,6 +136,88 @@ async def test_saving_with_delay( } +async def test_saving_with_delay_churn_reduction( + hass: HomeAssistant, + store: storage.Store, + hass_storage: dict[str, Any], + freezer: FrozenDateTimeFactory, +) -> None: + """Test saving data after a delay with timer churn reduction.""" + store.async_delay_save(lambda: MOCK_DATA, 1) + assert store.key not in hass_storage + + freezer.tick(0.2) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key not in hass_storage + + freezer.tick(1) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert hass_storage[store.key] == { + "version": MOCK_VERSION, + "minor_version": 1, + "key": MOCK_KEY, + "data": MOCK_DATA, + } + + del hass_storage[store.key] + # Simulate what some of the registries do when they add 100 entities + for _ in range(100): + store.async_delay_save(lambda: MOCK_DATA, 1) + + freezer.tick(0.2) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key not in hass_storage + store.async_delay_save(lambda: MOCK_DATA, 1) + + freezer.tick(1) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key in hass_storage + + del hass_storage[store.key] + + store.async_delay_save(lambda: MOCK_DATA, 1) + freezer.tick(0.5) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key not in hass_storage + + store.async_delay_save(lambda: MOCK_DATA, 1) + freezer.tick(0.8) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key not in hass_storage + + store.async_delay_save(lambda: MOCK_DATA, 1) + freezer.tick(0.8) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key not in hass_storage + + freezer.tick(0.2) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key in hass_storage + + # Make sure if we do another delayed save + # and one with a shorter delay, the shorter delay wins + del hass_storage[store.key] + store.async_delay_save(lambda: MOCK_DATA, 2) + freezer.tick(0.2) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key not in hass_storage + + store.async_delay_save(lambda: MOCK_DATA, 1) + freezer.tick(1.0) + async_fire_time_changed_exact(hass) + await hass.async_block_till_done() + assert store.key in hass_storage + + async def test_saving_on_final_write( hass: HomeAssistant, hass_storage: dict[str, Any] ) -> None: @@ -281,6 +368,23 @@ async def test_multiple_delay_save_calls( assert data == {"delay": "no"} +async def test_delay_save_zero( + hass: HomeAssistant, store: storage.Store, hass_storage: dict[str, Any] +) -> None: + """Test async_delay_save accepts 0.""" + store.async_delay_save(lambda: {"delay": "0"}, 0) + # sleep is to run one event loop to get the task scheduled + await asyncio.sleep(0) + await hass.async_block_till_done() + assert store.key in hass_storage + assert hass_storage[store.key] == { + "version": MOCK_VERSION, + "minor_version": 1, + "key": MOCK_KEY, + "data": {"delay": "0"}, + } + + async def test_multiple_save_calls( hass: HomeAssistant, store, hass_storage: dict[str, Any] ) -> None: @@ -706,7 +810,7 @@ async def test_os_error_is_fatal(tmpdir: py.path.local) -> None: async def test_read_only_store( - hass: HomeAssistant, read_only_store, hass_storage: dict[str, Any] + hass: HomeAssistant, read_only_store: storage.Store, hass_storage: dict[str, Any] ) -> None: """Test store opened in read only mode does not save.""" read_only_store.async_delay_save(lambda: MOCK_DATA, 1) diff --git a/tests/test_config_entries.py b/tests/test_config_entries.py index 6bf20234798..36a38b98622 100644 --- a/tests/test_config_entries.py +++ b/tests/test_config_entries.py @@ -8,6 +8,7 @@ import logging from typing import Any from unittest.mock import ANY, AsyncMock, Mock, patch +from freezegun.api import FrozenDateTimeFactory import pytest from syrupy.assertion import SnapshotAssertion @@ -733,13 +734,13 @@ async def test_entries_excludes_ignore_and_disabled( ] -async def test_saving_and_loading(hass: HomeAssistant) -> None: +async def test_saving_and_loading( + hass: HomeAssistant, freezer: FrozenDateTimeFactory +) -> None: """Test that we're saving and loading correctly.""" mock_integration( hass, - MockModule( - "test", async_setup_entry=lambda *args: AsyncMock(return_value=True) - ), + MockModule("test", async_setup_entry=AsyncMock(return_value=True)), ) mock_platform(hass, "test.config_flow", None) @@ -784,7 +785,8 @@ async def test_saving_and_loading(hass: HomeAssistant) -> None: ) # To trigger the call_later - async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1)) + freezer.tick(1.0) + async_fire_time_changed(hass) # To execute the save await hass.async_block_till_done()