Force recorder shutdown at final write event (#49145)

* Force recorder shutdown at EVENT_HOMEASSISTANT_FINAL_WRITE

* remove unreachable

* remove unreachable

* simplify

* cancel in async
This commit is contained in:
J. Nick Koston 2021-04-18 23:24:17 -10:00 committed by GitHub
parent 6048e88c8b
commit e24f5831a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 5 deletions

View File

@ -21,6 +21,7 @@ from homeassistant.components import persistent_notification
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_EXCLUDE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_STARTED,
EVENT_HOMEASSISTANT_STOP,
EVENT_STATE_CHANGED,
@ -338,10 +339,11 @@ class Recorder(threading.Thread):
"The recorder queue reached the maximum size of %s; Events are no longer being recorded",
MAX_QUEUE_BACKLOG,
)
self._stop_queue_watcher_and_event_listener()
self._async_stop_queue_watcher_and_event_listener()
def _stop_queue_watcher_and_event_listener(self):
"""Stop watching the queue."""
@callback
def _async_stop_queue_watcher_and_event_listener(self):
"""Stop watching the queue and listening for events."""
if self._queue_watcher:
self._queue_watcher()
self._queue_watcher = None
@ -370,11 +372,31 @@ class Recorder(threading.Thread):
def async_register(self, shutdown_task, hass_started):
"""Post connection initialize."""
def _empty_queue(event):
"""Empty the queue if its still present at final write."""
# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
# we will never be able to get though the events to shutdown in time.
#
# We drain all the events in the queue and then insert
# an empty one to ensure the next thing the recorder sees
# is a request to shutdown.
while True:
try:
self.queue.get_nowait()
except queue.Empty:
break
self.queue.put(None)
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue)
def shutdown(event):
"""Shut down the Recorder."""
if not hass_started.done():
hass_started.set_result(shutdown_task)
self.queue.put(None)
self.hass.add_job(self._async_stop_queue_watcher_and_event_listener)
self.join()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)
@ -399,7 +421,7 @@ class Recorder(threading.Thread):
"The recorder could not start, check [the logs](/config/logs)",
"Recorder",
)
self._stop_queue_watcher_and_event_listener()
self._async_stop_queue_watcher_and_event_listener()
@callback
def async_connection_success(self):
@ -836,6 +858,6 @@ class Recorder(threading.Thread):
def _shutdown(self):
"""Save end time for current run."""
self._stop_queue_watcher_and_event_listener()
self.hass.add_job(self._async_stop_queue_watcher_and_event_listener)
self._end_session()
self._close_connection()

View File

@ -24,6 +24,7 @@ from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import Events, RecorderRuns, States
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import (
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_STARTED,
EVENT_HOMEASSISTANT_STOP,
MATCH_ALL,
@ -265,6 +266,36 @@ def test_saving_state_with_sqlalchemy_exception(hass, hass_recorder, caplog):
assert "SQLAlchemyError error processing event" not in caplog.text
async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions(
hass, async_setup_recorder_instance, caplog
):
"""Test forcing shutdown."""
instance = await async_setup_recorder_instance(hass)
entity_id = "test.recorder"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
await async_wait_recording_done(hass, instance)
with patch.object(instance, "db_retry_wait", 0.2), patch.object(
instance.event_session,
"flush",
side_effect=OperationalError(
"insert the state", "fake params", "forced to fail"
),
):
for _ in range(100):
hass.states.async_set(entity_id, "on", attributes)
hass.states.async_set(entity_id, "off", attributes)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass.async_block_till_done()
assert "Error executing query" in caplog.text
assert "Error saving events" not in caplog.text
def test_saving_event(hass, hass_recorder):
"""Test saving and restoring an event."""
hass = hass_recorder()