Tweak recorder/restore_state (#6412)

* Tweak recorder/restore_state

* Lint
This commit is contained in:
Paulus Schoutsen 2017-03-05 01:52:08 -08:00 committed by Pascal Vizeli
parent 2650c73a89
commit e8a22cb4a8
4 changed files with 51 additions and 31 deletions

View File

@ -76,7 +76,7 @@ def wait_connection_ready(hass):
Returns a coroutine object. Returns a coroutine object.
""" """
return hass.data[DATA_INSTANCE].async_db_ready.wait() return hass.data[DATA_INSTANCE].async_db_ready
def run_information(hass, point_in_time: Optional[datetime]=None): def run_information(hass, point_in_time: Optional[datetime]=None):
@ -113,13 +113,13 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
include = conf.get(CONF_INCLUDE, {}) include = conf.get(CONF_INCLUDE, {})
exclude = conf.get(CONF_EXCLUDE, {}) exclude = conf.get(CONF_EXCLUDE, {})
hass.data[DATA_INSTANCE] = Recorder( instance = hass.data[DATA_INSTANCE] = Recorder(
hass, purge_days=purge_days, uri=db_url, include=include, hass, purge_days=purge_days, uri=db_url, include=include,
exclude=exclude) exclude=exclude)
hass.data[DATA_INSTANCE].async_initialize() instance.async_initialize()
hass.data[DATA_INSTANCE].start() instance.start()
return True return (yield from instance.async_db_ready)
class Recorder(threading.Thread): class Recorder(threading.Thread):
@ -135,7 +135,7 @@ class Recorder(threading.Thread):
self.queue = queue.Queue() # type: Any self.queue = queue.Queue() # type: Any
self.recording_start = dt_util.utcnow() self.recording_start = dt_util.utcnow()
self.db_url = uri self.db_url = uri
self.async_db_ready = asyncio.Event(loop=hass.loop) self.async_db_ready = asyncio.Future(loop=hass.loop)
self.engine = None # type: Any self.engine = None # type: Any
self.run_info = None # type: Any self.run_info = None # type: Any
@ -156,22 +156,33 @@ class Recorder(threading.Thread):
from .models import States, Events from .models import States, Events
from homeassistant.components import persistent_notification from homeassistant.components import persistent_notification
while True: tries = 1
connected = False
while not connected and tries < 5:
try: try:
self._setup_connection() self._setup_connection()
migration.migrate_schema(self) migration.migrate_schema(self)
self._setup_run() self._setup_run()
self.hass.loop.call_soon_threadsafe(self.async_db_ready.set) connected = True
break
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
_LOGGER.error("Error during connection setup: %s (retrying " _LOGGER.error("Error during connection setup: %s (retrying "
"in %s seconds)", err, CONNECT_RETRY_WAIT) "in %s seconds)", err, CONNECT_RETRY_WAIT)
time.sleep(CONNECT_RETRY_WAIT) time.sleep(CONNECT_RETRY_WAIT)
retry = locals().setdefault('retry', 10) - 1 tries += 1
if retry == 0:
msg = "The recorder could not start, please check the log" if not connected:
persistent_notification.create(self.hass, msg, 'Recorder') @callback
return def connection_failed():
"""Connection failed tasks."""
self.async_db_ready.set_result(False)
persistent_notification.async_create(
self.hass,
"The recorder could not start, please check the log",
"Recorder")
self.hass.add_job(connection_failed)
return
purge_task = object() purge_task = object()
shutdown_task = object() shutdown_task = object()
@ -180,6 +191,8 @@ class Recorder(threading.Thread):
@callback @callback
def register(): def register():
"""Post connection initialize.""" """Post connection initialize."""
self.async_db_ready.set_result(True)
def shutdown(event): def shutdown(event):
"""Shut down the Recorder.""" """Shut down the Recorder."""
if not hass_started.done(): if not hass_started.done():
@ -279,19 +292,20 @@ class Recorder(threading.Thread):
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from . import models from . import models
kwargs = {}
if self.db_url == 'sqlite://' or ':memory:' in self.db_url: if self.db_url == 'sqlite://' or ':memory:' in self.db_url:
from sqlalchemy.pool import StaticPool from sqlalchemy.pool import StaticPool
self.engine = create_engine(
'sqlite://',
connect_args={'check_same_thread': False},
poolclass=StaticPool,
pool_reset_on_return=None)
else:
self.engine = create_engine(self.db_url, echo=False)
kwargs['connect_args'] = {'check_same_thread': False}
kwargs['poolclass'] = StaticPool
kwargs['pool_reset_on_return'] = None
else:
kwargs['echo'] = False
self.engine = create_engine(self.db_url, **kwargs)
models.Base.metadata.create_all(self.engine) models.Base.metadata.create_all(self.engine)
session_factory = sessionmaker(bind=self.engine) self.get_session = scoped_session(sessionmaker(bind=self.engine))
self.get_session = scoped_session(session_factory)
def _close_connection(self): def _close_connection(self):
"""Close the connection.""" """Close the connection."""

View File

@ -3,6 +3,8 @@ import asyncio
import logging import logging
from datetime import timedelta from datetime import timedelta
import async_timeout
from homeassistant.core import HomeAssistant, CoreState, callback from homeassistant.core import HomeAssistant, CoreState, callback
from homeassistant.const import EVENT_HOMEASSISTANT_START from homeassistant.const import EVENT_HOMEASSISTANT_START
from homeassistant.components.history import get_states, last_recorder_run from homeassistant.components.history import get_states, last_recorder_run
@ -10,10 +12,10 @@ from homeassistant.components.recorder import (
wait_connection_ready, DOMAIN as _RECORDER) wait_connection_ready, DOMAIN as _RECORDER)
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__) RECORDER_TIMEOUT = 10
DATA_RESTORE_CACHE = 'restore_state_cache' DATA_RESTORE_CACHE = 'restore_state_cache'
_LOCK = 'restore_lock' _LOCK = 'restore_lock'
_LOGGER = logging.getLogger(__name__)
def _load_restore_cache(hass: HomeAssistant): def _load_restore_cache(hass: HomeAssistant):
@ -58,7 +60,14 @@ def async_get_last_state(hass, entity_id: str):
hass.state) hass.state)
return None return None
yield from wait_connection_ready(hass) try:
with async_timeout.timeout(RECORDER_TIMEOUT, loop=hass.loop):
connected = yield from wait_connection_ready(hass)
except asyncio.TimeoutError:
return None
if not connected:
return None
if _LOCK not in hass.data: if _LOCK not in hass.data:
hass.data[_LOCK] = asyncio.Lock(loop=hass.loop) hass.data[_LOCK] = asyncio.Lock(loop=hass.loop)

View File

@ -29,8 +29,7 @@ from homeassistant.components import sun, mqtt, recorder
from homeassistant.components.http.auth import auth_middleware from homeassistant.components.http.auth import auth_middleware
from homeassistant.components.http.const import ( from homeassistant.components.http.const import (
KEY_USE_X_FORWARDED_FOR, KEY_BANS_ENABLED, KEY_TRUSTED_NETWORKS) KEY_USE_X_FORWARDED_FOR, KEY_BANS_ENABLED, KEY_TRUSTED_NETWORKS)
from homeassistant.util.async import ( from homeassistant.util.async import run_callback_threadsafe
run_callback_threadsafe, run_coroutine_threadsafe)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
INST_COUNT = 0 INST_COUNT = 0
@ -477,8 +476,6 @@ def init_recorder_component(hass, add_config=None):
assert setup_component(hass, recorder.DOMAIN, assert setup_component(hass, recorder.DOMAIN,
{recorder.DOMAIN: config}) {recorder.DOMAIN: config})
assert recorder.DOMAIN in hass.config.components assert recorder.DOMAIN in hass.config.components
run_coroutine_threadsafe(
recorder.wait_connection_ready(hass), hass.loop).result()
_LOGGER.info("In-memory recorder successfully started") _LOGGER.info("In-memory recorder successfully started")

View File

@ -34,7 +34,7 @@ def test_caching_data(hass):
patch('homeassistant.helpers.restore_state.get_states', patch('homeassistant.helpers.restore_state.get_states',
return_value=states), \ return_value=states), \
patch('homeassistant.helpers.restore_state.wait_connection_ready', patch('homeassistant.helpers.restore_state.wait_connection_ready',
return_value=mock_coro()): return_value=mock_coro(True)):
state = yield from async_get_last_state(hass, 'input_boolean.b1') state = yield from async_get_last_state(hass, 'input_boolean.b1')
assert DATA_RESTORE_CACHE in hass.data assert DATA_RESTORE_CACHE in hass.data