diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 98199bab430..733d8f248a8 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -21,6 +21,7 @@ from homeassistant.components import persistent_notification from homeassistant.const import ( ATTR_ENTITY_ID, CONF_EXCLUDE, + EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, @@ -338,10 +339,11 @@ class Recorder(threading.Thread): "The recorder queue reached the maximum size of %s; Events are no longer being recorded", MAX_QUEUE_BACKLOG, ) - self._stop_queue_watcher_and_event_listener() + self._async_stop_queue_watcher_and_event_listener() - def _stop_queue_watcher_and_event_listener(self): - """Stop watching the queue.""" + @callback + def _async_stop_queue_watcher_and_event_listener(self): + """Stop watching the queue and listening for events.""" if self._queue_watcher: self._queue_watcher() self._queue_watcher = None @@ -370,11 +372,31 @@ class Recorder(threading.Thread): def async_register(self, shutdown_task, hass_started): """Post connection initialize.""" + def _empty_queue(event): + """Empty the queue if its still present at final write.""" + + # If the queue is full of events to be processed because + # the database is so broken that every event results in a retry + # we will never be able to get though the events to shutdown in time. + # + # We drain all the events in the queue and then insert + # an empty one to ensure the next thing the recorder sees + # is a request to shutdown. + while True: + try: + self.queue.get_nowait() + except queue.Empty: + break + self.queue.put(None) + + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue) + def shutdown(event): """Shut down the Recorder.""" if not hass_started.done(): hass_started.set_result(shutdown_task) self.queue.put(None) + self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) self.join() self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) @@ -399,7 +421,7 @@ class Recorder(threading.Thread): "The recorder could not start, check [the logs](/config/logs)", "Recorder", ) - self._stop_queue_watcher_and_event_listener() + self._async_stop_queue_watcher_and_event_listener() @callback def async_connection_success(self): @@ -836,6 +858,6 @@ class Recorder(threading.Thread): def _shutdown(self): """Save end time for current run.""" - self._stop_queue_watcher_and_event_listener() + self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) self._end_session() self._close_connection() diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 67032e9f077..d3464088394 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -24,6 +24,7 @@ from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.util import session_scope from homeassistant.const import ( + EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, MATCH_ALL, @@ -265,6 +266,36 @@ def test_saving_state_with_sqlalchemy_exception(hass, hass_recorder, caplog): assert "SQLAlchemyError error processing event" not in caplog.text +async def test_force_shutdown_with_queue_of_writes_that_generate_exceptions( + hass, async_setup_recorder_instance, caplog +): + """Test forcing shutdown.""" + instance = await async_setup_recorder_instance(hass) + + entity_id = "test.recorder" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + await async_wait_recording_done(hass, instance) + + with patch.object(instance, "db_retry_wait", 0.2), patch.object( + instance.event_session, + "flush", + side_effect=OperationalError( + "insert the state", "fake params", "forced to fail" + ), + ): + for _ in range(100): + hass.states.async_set(entity_id, "on", attributes) + hass.states.async_set(entity_id, "off", attributes) + + hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + hass.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE) + await hass.async_block_till_done() + + assert "Error executing query" in caplog.text + assert "Error saving events" not in caplog.text + + def test_saving_event(hass, hass_recorder): """Test saving and restoring an event.""" hass = hass_recorder()