diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 8798b213ec9..d2930988aee 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -212,7 +212,7 @@ class Recorder(threading.Thread): self.auto_purge = auto_purge self.keep_days = keep_days self.commit_interval = commit_interval - self.queue: Any = queue.Queue() + self.queue: Any = queue.SimpleQueue() self.recording_start = dt_util.utcnow() self.db_url = uri self.db_max_retries = db_max_retries @@ -339,16 +339,13 @@ class Recorder(threading.Thread): if event is None: self._close_run() self._close_connection() - self.queue.task_done() return if isinstance(event, PurgeTask): # Schedule a new purge task if this one didn't finish if not purge.purge_old_data(self, event.keep_days, event.repack): self.queue.put(PurgeTask(event.keep_days, event.repack)) - self.queue.task_done() continue if event.event_type == EVENT_TIME_CHANGED: - self.queue.task_done() self._keepalive_count += 1 if self._keepalive_count >= KEEPALIVE_TIME: self._keepalive_count = 0 @@ -360,13 +357,11 @@ class Recorder(threading.Thread): self._commit_event_session_or_retry() continue if event.event_type in self.exclude_t: - self.queue.task_done() continue entity_id = event.data.get(ATTR_ENTITY_ID) if entity_id is not None: if not self.entity_filter(entity_id): - self.queue.task_done() continue try: @@ -409,8 +404,6 @@ class Recorder(threading.Thread): if not self.commit_interval: self._commit_event_session_or_retry() - self.queue.task_done() - def _send_keep_alive(self): try: _LOGGER.debug("Sending keepalive") @@ -493,8 +486,19 @@ class Recorder(threading.Thread): self.queue.put(event) def block_till_done(self): - """Block till all events processed.""" - self.queue.join() + """Block till all events processed. + + This is only called in tests. + + This only blocks until the queue is empty + which does not mean the recorder is done. + + Call tests.common's wait_recording_done + after calling this to ensure the data + is in the database. + """ + while not self.queue.empty(): + time.sleep(0.025) def _setup_connection(self): """Ensure database is ready to fly.""" diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 93fb6e51621..a93e3537905 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -10,6 +10,8 @@ from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.util import session_scope from homeassistant.util import dt as dt_util +from .common import wait_recording_done + from tests.async_mock import patch from tests.common import get_test_home_assistant, init_recorder_component @@ -37,6 +39,7 @@ class TestRecorderPurge(unittest.TestCase): self.hass.block_till_done() self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) with recorder.session_scope(hass=self.hass) as session: for event_id in range(6): @@ -72,6 +75,7 @@ class TestRecorderPurge(unittest.TestCase): self.hass.block_till_done() self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) with recorder.session_scope(hass=self.hass) as session: for event_id in range(6): @@ -103,6 +107,7 @@ class TestRecorderPurge(unittest.TestCase): self.hass.block_till_done() self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) with recorder.session_scope(hass=self.hass) as session: for rec_id in range(6): @@ -183,6 +188,7 @@ class TestRecorderPurge(unittest.TestCase): assert recorder_runs.count() == 7 self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) # run purge method - no service data, use defaults self.hass.services.call("recorder", "purge") @@ -190,6 +196,7 @@ class TestRecorderPurge(unittest.TestCase): # Small wait for recorder thread self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) # only purged old events assert states.count() == 4 @@ -201,6 +208,7 @@ class TestRecorderPurge(unittest.TestCase): # Small wait for recorder thread self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) # we should only have 2 states left after purging assert states.count() == 2 @@ -223,6 +231,7 @@ class TestRecorderPurge(unittest.TestCase): self.hass.services.call("recorder", "purge", service_data=service_data) self.hass.block_till_done() self.hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(self.hass) assert ( mock_logger.debug.mock_calls[5][1][0] == "Vacuuming SQL DB to free space"