Make recorder block_till_done reliable (#40043)

This commit is contained in:
J. Nick Koston 2020-09-14 01:48:29 -05:00 committed by GitHub
parent ad1a71ebc3
commit c19b5c5ac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -196,6 +196,10 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
PurgeTask = namedtuple("PurgeTask", ["keep_days", "repack"]) PurgeTask = namedtuple("PurgeTask", ["keep_days", "repack"])
class WaitTask:
"""An object to insert into the recorder queue to tell it set the _queue_watch event."""
class Recorder(threading.Thread): class Recorder(threading.Thread):
"""A threaded recorder class.""" """A threaded recorder class."""
@ -226,6 +230,7 @@ class Recorder(threading.Thread):
self.db_retry_wait = db_retry_wait self.db_retry_wait = db_retry_wait
self.db_integrity_check = db_integrity_check self.db_integrity_check = db_integrity_check
self.async_db_ready = asyncio.Future() self.async_db_ready = asyncio.Future()
self._queue_watch = threading.Event()
self.engine: Any = None self.engine: Any = None
self.run_info: Any = None self.run_info: Any = None
@ -353,6 +358,9 @@ class Recorder(threading.Thread):
if not purge.purge_old_data(self, event.keep_days, event.repack): if not purge.purge_old_data(self, event.keep_days, event.repack):
self.queue.put(PurgeTask(event.keep_days, event.repack)) self.queue.put(PurgeTask(event.keep_days, event.repack))
continue continue
if isinstance(event, WaitTask):
self._queue_watch.set()
continue
if event.event_type == EVENT_TIME_CHANGED: if event.event_type == EVENT_TIME_CHANGED:
self._keepalive_count += 1 self._keepalive_count += 1
if self._keepalive_count >= KEEPALIVE_TIME: if self._keepalive_count >= KEEPALIVE_TIME:
@ -506,8 +514,9 @@ class Recorder(threading.Thread):
after calling this to ensure the data after calling this to ensure the data
is in the database. is in the database.
""" """
while not self.queue.empty(): self._queue_watch.clear()
time.sleep(0.025) self.queue.put(WaitTask())
self._queue_watch.wait()
def _setup_connection(self): def _setup_connection(self):
"""Ensure database is ready to fly.""" """Ensure database is ready to fly."""