From b09b5729a36e69f517703038a38e197ff5c13f42 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 8 Apr 2020 14:56:22 -0500 Subject: [PATCH] Accommodate mysql servers with a low wait_timeout (#33638) Some providers have set their wait_timeout to 60s in order to pack as many users as they can on a machine. The mysql default is 28800 seconds (8 hours) Since mysql connection build and tear down is relativity expensive, we want to avoid being disconnected. We now accommodate this scenario with the following: 1. Raise the mysql session wait_timeout 28800 when we connect 2. The event session now does a 30 second keep alive to ensure the connection stays open --- homeassistant/components/recorder/__init__.py | 46 ++++++++++++++++--- 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 56f5b53326c..ad160a1ba8f 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -5,12 +5,11 @@ import concurrent.futures from datetime import datetime, timedelta import logging import queue -from sqlite3 import Connection import threading import time from typing import Any, Dict, Optional -from sqlalchemy import create_engine, exc +from sqlalchemy import create_engine, exc, select from sqlalchemy.engine import Engine from sqlalchemy.event import listens_for from sqlalchemy.orm import scoped_session, sessionmaker @@ -61,6 +60,7 @@ DEFAULT_URL = "sqlite:///{hass_config_path}" DEFAULT_DB_FILE = "home-assistant_v2.db" DEFAULT_DB_MAX_RETRIES = 10 DEFAULT_DB_RETRY_WAIT = 3 +KEEPALIVE_TIME = 30 CONF_DB_URL = "db_url" CONF_DB_MAX_RETRIES = "db_max_retries" @@ -223,6 +223,7 @@ class Recorder(threading.Thread): self.exclude_t = exclude.get(CONF_EVENT_TYPES, []) self._timechanges_seen = 0 + self._keepalive_count = 0 self.event_session = None self.get_session = None @@ -353,6 +354,10 @@ class Recorder(threading.Thread): continue if event.event_type == EVENT_TIME_CHANGED: self.queue.task_done() + self._keepalive_count += 1 + if self._keepalive_count >= KEEPALIVE_TIME: + self._keepalive_count = 0 + self._send_keep_alive() if self.commit_interval: self._timechanges_seen += 1 if self._timechanges_seen >= self.commit_interval: @@ -400,6 +405,18 @@ class Recorder(threading.Thread): self.queue.task_done() + def _send_keep_alive(self): + try: + _LOGGER.debug("Sending keepalive") + self.event_session.connection().scalar(select([1])) + return + except Exception as err: # pylint: disable=broad-except + # Must catch the exception to prevent the loop from collapsing + _LOGGER.error( + "Error in database connectivity during keepalive: %s.", err, + ) + self._reopen_event_session() + def _commit_event_session_or_retry(self): tries = 1 while tries <= self.db_max_retries: @@ -419,7 +436,7 @@ class Recorder(threading.Thread): ) else: _LOGGER.error( - "Error in database connectivity: %s. " + "Error in database connectivity during commit: %s. " "(retrying in %s seconds)", err, self.db_retry_wait, @@ -435,6 +452,15 @@ class Recorder(threading.Thread): "Error in database update. Could not save " "after %d tries. Giving up", tries, ) + self._reopen_event_session() + + def _reopen_event_session(self): + try: + self.event_session.rollback() + except Exception as err: # pylint: disable=broad-except + # Must catch the exception to prevent the loop from collapsing + _LOGGER.exception("Error while rolling back event session: %s", err) + try: self.event_session.close() except Exception as err: # pylint: disable=broad-except @@ -470,15 +496,23 @@ class Recorder(threading.Thread): # pylint: disable=unused-variable @listens_for(Engine, "connect") - def set_sqlite_pragma(dbapi_connection, connection_record): - """Set sqlite's WAL mode.""" - if isinstance(dbapi_connection, Connection): + def setup_connection(dbapi_connection, connection_record): + """Dbapi specific connection settings.""" + + # We do not import sqlite3 here so mysql/other + # users do not have to pay for it to be loaded in + # memory + if self.db_url == "sqlite://" or ":memory:" in self.db_url: old_isolation = dbapi_connection.isolation_level dbapi_connection.isolation_level = None cursor = dbapi_connection.cursor() cursor.execute("PRAGMA journal_mode=WAL") cursor.close() dbapi_connection.isolation_level = old_isolation + elif self.db_url.startswith("mysql"): + cursor = dbapi_connection.cursor() + cursor.execute("SET session wait_timeout=28800") + cursor.close() if self.db_url == "sqlite://" or ":memory:" in self.db_url: kwargs["connect_args"] = {"check_same_thread": False}