diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 91c5fe65812..50c522f0e97 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -579,6 +579,34 @@ class EventTask(RecorderTask): instance._process_one_event(self.event) +@dataclass +class KeepAliveTask(RecorderTask): + """A keep alive to be sent.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + # pylint: disable-next=[protected-access] + instance._send_keep_alive() + + +@dataclass +class CommitTask(RecorderTask): + """Commit the event session.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Handle the task.""" + # pylint: disable-next=[protected-access] + instance._commit_event_session_or_retry() + + +COMMIT_TASK = CommitTask() +KEEP_ALIVE_TASK = KeepAliveTask() + + class Recorder(threading.Thread): """A threaded recorder class.""" @@ -621,9 +649,7 @@ class Recorder(threading.Thread): self.entity_filter = entity_filter self.exclude_t = exclude_t - self._timechanges_seen = 0 self._commits_without_expire = 0 - self._keepalive_count = 0 self._old_states: dict[str, States] = {} self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) self._pending_state_attributes: dict[str, StateAttributes] = {} @@ -640,6 +666,10 @@ class Recorder(threading.Thread): self._db_executor: DBInterruptibleThreadPoolExecutor | None = None self._exclude_attributes_by_domain = exclude_attributes_by_domain + self._keep_alive_listener: CALLBACK_TYPE | None = None + self._commit_listener: CALLBACK_TYPE | None = None + self._periodic_listener: CALLBACK_TYPE | None = None + self._nightly_listener: CALLBACK_TYPE | None = None self.enabled = True def set_enable(self, enable: bool) -> None: @@ -670,6 +700,22 @@ class Recorder(threading.Thread): self.hass, self._async_check_queue, timedelta(minutes=10) ) + @callback + def _async_keep_alive(self, now: datetime) -> None: + """Queue a keep alive.""" + if self._event_listener: + self.queue.put(KEEP_ALIVE_TASK) + + @callback + def _async_commit(self, now: datetime) -> None: + """Queue a commit.""" + if ( + self._event_listener + and not self._database_lock_task + and self._event_session_has_pending_writes() + ): + self.queue.put(COMMIT_TASK) + @callback def async_add_executor_job( self, target: Callable[..., T], *args: Any @@ -709,9 +755,29 @@ class Recorder(threading.Thread): self._event_listener() self._event_listener = None + @callback + def _async_stop_listeners(self) -> None: + """Stop listeners.""" + self._async_stop_queue_watcher_and_event_listener() + if self._keep_alive_listener: + self._keep_alive_listener() + self._keep_alive_listener = None + if self._commit_listener: + self._commit_listener() + self._commit_listener = None + if self._nightly_listener: + self._nightly_listener() + self._nightly_listener = None + if self._periodic_listener: + self._periodic_listener() + self._periodic_listener = None + @callback def _async_event_filter(self, event: Event) -> bool: """Filter events.""" + if event.event_type == EVENT_TIME_CHANGED: + return False + if event.event_type in self.exclude_t: return False @@ -780,7 +846,7 @@ class Recorder(threading.Thread): if not self._hass_started.done(): self._hass_started.set_result(SHUTDOWN_TASK) self.queue.put(StopTask()) - self._async_stop_queue_watcher_and_event_listener() + self._async_stop_listeners() await self.hass.async_add_executor_job(self.join) self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown) @@ -807,7 +873,7 @@ class Recorder(threading.Thread): "The recorder could not start, check [the logs](/config/logs)", "Recorder", ) - self._async_stop_queue_watcher_and_event_listener() + self._async_stop_listeners() @callback def async_connection_success(self) -> None: @@ -864,6 +930,10 @@ class Recorder(threading.Thread): """Schedule external statistics.""" self.queue.put(ExternalStatisticsTask(metadata, stats)) + @callback + def _using_sqlite(self) -> bool: + return bool(self.engine and self.engine.dialect.name == "sqlite") + @callback def _async_setup_periodic_tasks(self) -> None: """Prepare periodic tasks.""" @@ -871,13 +941,26 @@ class Recorder(threading.Thread): # Home Assistant is shutting down return + # If the db is using a socket connection, we need to keep alive + # to prevent errors from unexpected disconnects + if not self._using_sqlite(): + self._keep_alive_listener = async_track_time_interval( + self.hass, self._async_keep_alive, timedelta(seconds=KEEPALIVE_TIME) + ) + + # If the commit interval is not 0, we need commit periodicly + if self.commit_interval: + self._commit_listener = async_track_time_interval( + self.hass, self._async_commit, timedelta(seconds=self.commit_interval) + ) + # Run nightly tasks at 4:12am - async_track_time_change( + self._nightly_listener = async_track_time_change( self.hass, self.async_nightly_tasks, hour=4, minute=12, second=0 ) # Compile short term statistics every 5 minutes - async_track_utc_time_change( + self._periodic_listener = async_track_utc_time_change( self.hass, self.async_periodic_statistics, minute=range(0, 60, 5), second=10 ) @@ -947,6 +1030,7 @@ class Recorder(threading.Thread): self.stop_requested = False while not self.stop_requested: task = self.queue.get() + _LOGGER.debug("Processing task: %s", task) try: self._process_one_task_or_recover(task) except Exception as err: # pylint: disable=broad-except @@ -1050,20 +1134,14 @@ class Recorder(threading.Thread): ) def _process_one_event(self, event: Event) -> None: - if event.event_type == EVENT_TIME_CHANGED: - self._keepalive_count += 1 - if self._keepalive_count >= KEEPALIVE_TIME: - self._keepalive_count = 0 - self._send_keep_alive() - if self.commit_interval: - self._timechanges_seen += 1 - if self._timechanges_seen >= self.commit_interval: - self._timechanges_seen = 0 - self._commit_event_session_or_retry() - return - if not self.enabled: return + self._process_event_into_session(event) + # Commit if the commit interval is zero + if not self.commit_interval: + self._commit_event_session_or_retry() + + def _process_event_into_session(self, event: Event) -> None: assert self.event_session is not None try: @@ -1129,12 +1207,8 @@ class Recorder(threading.Thread): self._pending_expunge.append(dbstate) else: dbstate.state = None - self.event_session.add(dbstate) dbstate.event = dbevent - - # Commit right away if the commit interval is zero - if not self.commit_interval: - self._commit_event_session_or_retry() + self.event_session.add(dbstate) def _handle_database_error(self, err: Exception) -> bool: """Handle a database error that may result in moving away the corrupt db.""" @@ -1146,11 +1220,14 @@ class Recorder(threading.Thread): return True return False + def _event_session_has_pending_writes(self) -> bool: + return bool( + self.event_session and (self.event_session.new or self.event_session.dirty) + ) + def _commit_event_session_or_retry(self) -> None: """Commit the event session if there is work to do.""" - if not self.event_session or ( - not self.event_session.new and not self.event_session.dirty - ): + if not self._event_session_has_pending_writes(): return tries = 1 while tries <= self.db_max_retries: @@ -1197,7 +1274,7 @@ class Recorder(threading.Thread): # Expire is an expensive operation (frequently more expensive # than the flush and commit itself) so we only # do it after EXPIRE_AFTER_COMMITS commits - if self._commits_without_expire == EXPIRE_AFTER_COMMITS: + if self._commits_without_expire >= EXPIRE_AFTER_COMMITS: self._commits_without_expire = 0 self.event_session.expire_all() @@ -1266,7 +1343,7 @@ class Recorder(threading.Thread): async def lock_database(self) -> bool: """Lock database so it can be backed up safely.""" - if not self.engine or self.engine.dialect.name != "sqlite": + if not self._using_sqlite(): _LOGGER.debug( "Not a SQLite database or not connected, locking not necessary" ) @@ -1295,7 +1372,7 @@ class Recorder(threading.Thread): Returns true if database lock has been held throughout the process. """ - if not self.engine or self.engine.dialect.name != "sqlite": + if not self._using_sqlite(): _LOGGER.debug( "Not a SQLite database or not connected, unlocking not necessary" ) @@ -1414,7 +1491,7 @@ class Recorder(threading.Thread): def _shutdown(self) -> None: """Save end time for current run.""" - self.hass.add_job(self._async_stop_queue_watcher_and_event_listener) + self.hass.add_job(self._async_stop_listeners) self._stop_executor() self._end_session() self._close_connection() diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 903be6ca2cd..43a30932ac5 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -13,6 +13,7 @@ from homeassistant.components import recorder from homeassistant.components.recorder import ( CONF_AUTO_PURGE, CONF_AUTO_REPACK, + CONF_COMMIT_INTERVAL, CONF_DB_URL, CONFIG_SCHEMA, DOMAIN, @@ -186,7 +187,7 @@ async def test_saving_many_states( ): """Test we expire after many commits.""" instance = await async_setup_recorder_instance( - hass, {recorder.CONF_COMMIT_INTERVAL: 1} + hass, {recorder.CONF_COMMIT_INTERVAL: 0} ) entity_id = "test.recorder" @@ -825,7 +826,7 @@ def test_auto_purge_disabled(hass_recorder): dt_util.set_default_time_zone(tz) # Purging is scheduled to happen at 4:12am every day. We want - # to verify that when auto purge is disabled perodic db cleanups + # to verify that when auto purge is disabled periodic db cleanups # are still scheduled # # The clock is started at 4:15am then advanced forward below @@ -1208,7 +1209,9 @@ async def test_database_corruption_while_running(hass, tmpdir, caplog): test_db_file = await hass.async_add_executor_job(_create_tmpdir_for_test_db) dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" - assert await async_setup_component(hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl}}) + assert await async_setup_component( + hass, DOMAIN, {DOMAIN: {CONF_DB_URL: dburl, CONF_COMMIT_INTERVAL: 0}} + ) await hass.async_block_till_done() caplog.clear() @@ -1299,7 +1302,10 @@ def test_entity_id_filter(hass_recorder): async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): """Test writing events during lock getting written after unlocking.""" # Use file DB, in memory DB cannot do write locks. - config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")} + config = { + recorder.CONF_COMMIT_INTERVAL: 0, + recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db"), + } await async_init_recorder_component(hass, config) await hass.async_block_till_done() @@ -1311,7 +1317,7 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): event_type = "EVENT_TEST" event_data = {"test_attr": 5, "test_attr_10": "nice"} - hass.bus.fire(event_type, event_data) + hass.bus.async_fire(event_type, event_data) task = asyncio.create_task(async_wait_recording_done(hass, instance)) # Recording can't be finished while lock is held @@ -1333,7 +1339,10 @@ async def test_database_lock_and_unlock(hass: HomeAssistant, tmp_path): async def test_database_lock_and_overflow(hass: HomeAssistant, tmp_path): """Test writing events during lock leading to overflow the queue causes the database to unlock.""" # Use file DB, in memory DB cannot do write locks. - config = {recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db")} + config = { + recorder.CONF_COMMIT_INTERVAL: 0, + recorder.CONF_DB_URL: "sqlite:///" + str(tmp_path / "pytest.db"), + } await async_init_recorder_component(hass, config) await hass.async_block_till_done() @@ -1402,3 +1411,42 @@ async def test_in_memory_database(hass, caplog): hass, recorder.DOMAIN, {recorder.DOMAIN: {recorder.CONF_DB_URL: "sqlite://"}} ) assert "In-memory SQLite database is not supported" in caplog.text + + +async def test_database_connection_keep_alive( + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, + caplog: pytest.LogCaptureFixture, +): + """Test we keep alive socket based dialects.""" + with patch( + "homeassistant.components.recorder.Recorder._using_sqlite", return_value=False + ): + instance = await async_setup_recorder_instance(hass) + # We have to mock this since we don't have a mock + # MySQL server available in tests. + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await instance.async_recorder_ready.wait() + + async_fire_time_changed( + hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME) + ) + await async_wait_recording_done(hass, instance) + assert "Sending keepalive" in caplog.text + + +async def test_database_connection_keep_alive_disabled_on_sqlite( + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, + caplog: pytest.LogCaptureFixture, +): + """Test we do not do keep alive for sqlite.""" + instance = await async_setup_recorder_instance(hass) + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + await instance.async_recorder_ready.wait() + + async_fire_time_changed( + hass, dt_util.utcnow() + timedelta(seconds=recorder.KEEPALIVE_TIME) + ) + await async_wait_recording_done(hass, instance) + assert "Sending keepalive" not in caplog.text diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index a169cdd8356..ee4c16ff8e0 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -206,7 +206,9 @@ async def test_events_during_migration_queue_exhausted(hass): "homeassistant.components.recorder.create_engine", new=create_engine_test ), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1): await async_setup_component( - hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + hass, + "recorder", + {"recorder": {"db_url": "sqlite://", "commit_interval": 0}}, ) hass.states.async_set("my.entity", "on", {}) await hass.async_block_till_done()