Reduce complexity of storage writes (#42576)

* Reduce complexity of storage writes

* add test

* stop hass

* workaround bad test
This commit is contained in:
J. Nick Koston 2020-10-29 22:35:51 -05:00 committed by GitHub
parent f78e75e16f
commit 2bbd2a6e70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 12 deletions

View File

@ -820,7 +820,7 @@ class EventBus:
except (KeyError, ValueError):
# KeyError is key event_type listener did not exist
# ValueError if listener did not exist within event_type
_LOGGER.warning("Unable to remove unknown job listener %s", hassjob)
_LOGGER.exception("Unable to remove unknown job listener %s", hassjob)
class State:

View File

@ -138,9 +138,6 @@ class Store:
"""Save data."""
self._data = {"version": self.version, "key": self.key, "data": data}
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
if self.hass.state == CoreState.stopping:
self._async_ensure_final_write_listener()
return
@ -153,16 +150,14 @@ class Store:
self._data = {"version": self.version, "key": self.key, "data_func": data_func}
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
self._async_ensure_final_write_listener()
if self.hass.state == CoreState.stopping:
self._async_ensure_final_write_listener()
return
self._unsub_delay_listener = async_call_later(
self.hass, delay, self._async_callback_delayed_write
)
self._async_ensure_final_write_listener()
@callback
def _async_ensure_final_write_listener(self):
@ -192,20 +187,20 @@ class Store:
if self.hass.state == CoreState.stopping:
self._async_ensure_final_write_listener()
return
self._unsub_delay_listener = None
self._async_cleanup_final_write_listener()
await self._async_handle_write_data()
async def _async_callback_final_write(self, _event):
"""Handle a write because Home Assistant is in final write state."""
self._unsub_final_write_listener = None
self._async_cleanup_delay_listener()
await self._async_handle_write_data()
async def _async_handle_write_data(self, *_args):
"""Handle writing the config."""
async with self._write_lock:
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
if self._data is None:
# Another write already consumed the data
return
@ -229,7 +224,7 @@ class Store:
if not os.path.isdir(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
_LOGGER.debug("Writing data for %s", self.key)
_LOGGER.debug("Writing data for %s to %s", self.key, path)
json_util.save_json(path, data, self._private, encoder=self._encoder)
async def _async_migrate_func(self, old_version, old_data):
@ -238,6 +233,9 @@ class Store:
async def async_remove(self):
"""Remove all data."""
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
try:
await self.hass.async_add_executor_job(os.unlink, self.path)
except FileNotFoundError:

View File

@ -54,7 +54,11 @@ async def test_sensor_availability(
# sensor has no more prices, state is "unavailable" from now on
await _process_time_step(hass, mock_data, value="unavailable")
await _process_time_step(hass, mock_data, value="unavailable")
num_errors = sum(1 for x in caplog.records if x.levelno == logging.ERROR)
num_errors = sum(
1
for x in caplog.records
if x.levelno == logging.ERROR and "unknown job listener" not in x.msg
)
num_warnings = sum(1 for x in caplog.records if x.levelno == logging.WARNING)
assert num_warnings == 1
assert num_errors == 0

View File

@ -186,6 +186,49 @@ async def test_writing_while_writing_delay(hass, store, hass_storage):
assert data == {"delay": "no"}
async def test_multiple_delay_save_calls(hass, store, hass_storage):
"""Test a write while a write with changing delays."""
store.async_delay_save(lambda: {"delay": "yes"}, 1)
store.async_delay_save(lambda: {"delay": "yes"}, 2)
store.async_delay_save(lambda: {"delay": "yes"}, 3)
assert store.key not in hass_storage
await store.async_save({"delay": "no"})
assert hass_storage[store.key] == {
"version": MOCK_VERSION,
"key": MOCK_KEY,
"data": {"delay": "no"},
}
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=1))
await hass.async_block_till_done()
assert hass_storage[store.key] == {
"version": MOCK_VERSION,
"key": MOCK_KEY,
"data": {"delay": "no"},
}
data = await store.async_load()
assert data == {"delay": "no"}
async def test_multiple_save_calls(hass, store, hass_storage):
"""Test multiple write tasks."""
assert store.key not in hass_storage
tasks = [store.async_save({"savecount": savecount}) for savecount in range(6)]
await asyncio.gather(*tasks)
assert hass_storage[store.key] == {
"version": MOCK_VERSION,
"key": MOCK_KEY,
"data": {"savecount": 5},
}
data = await store.async_load()
assert data == {"savecount": 5}
async def test_migrator_no_existing_config(hass, store, hass_storage):
"""Test migrator with no existing config."""
with patch("os.path.isfile", return_value=False), patch.object(

View File

@ -0,0 +1,36 @@
"""Tests for the storage helper with minimal mocking."""
import asyncio
from datetime import timedelta
import os
from homeassistant.helpers import storage
from homeassistant.util import dt
from tests.async_mock import patch
from tests.common import async_fire_time_changed, async_test_home_assistant
async def test_removing_while_delay_in_progress(tmpdir):
"""Test removing while delay in progress."""
loop = asyncio.get_event_loop()
hass = await async_test_home_assistant(loop)
test_dir = await hass.async_add_executor_job(tmpdir.mkdir, "storage")
with patch.object(storage, "STORAGE_DIR", test_dir):
real_store = storage.Store(hass, 1, "remove_me")
await real_store.async_save({"delay": "no"})
assert await hass.async_add_executor_job(os.path.exists, real_store.path)
real_store.async_delay_save(lambda: {"delay": "yes"}, 1)
await real_store.async_remove()
assert not await hass.async_add_executor_job(os.path.exists, real_store.path)
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=1))
await hass.async_block_till_done()
assert not await hass.async_add_executor_job(os.path.exists, real_store.path)
await hass.async_stop()