Purge recorder data by default (#12271)

This commit is contained in:
Anders Melchiorsen 2018-02-11 22:22:59 +01:00 committed by Paulus Schoutsen
parent 219ed7331c
commit 247edf1b69
5 changed files with 63 additions and 48 deletions

View File

@ -43,10 +43,12 @@ DOMAIN = 'recorder'
SERVICE_PURGE = 'purge' SERVICE_PURGE = 'purge'
ATTR_KEEP_DAYS = 'keep_days' ATTR_KEEP_DAYS = 'keep_days'
ATTR_REPACK = 'repack'
SERVICE_PURGE_SCHEMA = vol.Schema({ SERVICE_PURGE_SCHEMA = vol.Schema({
vol.Required(ATTR_KEEP_DAYS): vol.Optional(ATTR_KEEP_DAYS):
vol.All(vol.Coerce(int), vol.Range(min=0)) vol.All(vol.Coerce(int), vol.Range(min=0)),
vol.Optional(ATTR_REPACK, default=False): cv.boolean
}) })
DEFAULT_URL = 'sqlite:///{hass_config_path}' DEFAULT_URL = 'sqlite:///{hass_config_path}'
@ -76,7 +78,7 @@ FILTER_SCHEMA = vol.Schema({
CONFIG_SCHEMA = vol.Schema({ CONFIG_SCHEMA = vol.Schema({
DOMAIN: FILTER_SCHEMA.extend({ DOMAIN: FILTER_SCHEMA.extend({
vol.Optional(CONF_PURGE_KEEP_DAYS): vol.Optional(CONF_PURGE_KEEP_DAYS, default=10):
vol.All(vol.Coerce(int), vol.Range(min=1)), vol.All(vol.Coerce(int), vol.Range(min=1)),
vol.Optional(CONF_PURGE_INTERVAL, default=1): vol.Optional(CONF_PURGE_INTERVAL, default=1):
vol.All(vol.Coerce(int), vol.Range(min=0)), vol.All(vol.Coerce(int), vol.Range(min=0)),
@ -122,12 +124,6 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
keep_days = conf.get(CONF_PURGE_KEEP_DAYS) keep_days = conf.get(CONF_PURGE_KEEP_DAYS)
purge_interval = conf.get(CONF_PURGE_INTERVAL) purge_interval = conf.get(CONF_PURGE_INTERVAL)
if keep_days is None and purge_interval != 0:
_LOGGER.warning(
"From version 0.64.0 the 'recorder' component will by default "
"purge data older than 10 days. To keep data longer you must "
"configure 'purge_keep_days' or 'purge_interval'.")
db_url = conf.get(CONF_DB_URL, None) db_url = conf.get(CONF_DB_URL, None)
if not db_url: if not db_url:
db_url = DEFAULT_URL.format( db_url = DEFAULT_URL.format(
@ -144,7 +140,7 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@asyncio.coroutine @asyncio.coroutine
def async_handle_purge_service(service): def async_handle_purge_service(service):
"""Handle calls to the purge service.""" """Handle calls to the purge service."""
instance.do_adhoc_purge(service.data[ATTR_KEEP_DAYS]) instance.do_adhoc_purge(**service.data)
hass.services.async_register( hass.services.async_register(
DOMAIN, SERVICE_PURGE, async_handle_purge_service, DOMAIN, SERVICE_PURGE, async_handle_purge_service,
@ -153,7 +149,7 @@ def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return (yield from instance.async_db_ready) return (yield from instance.async_db_ready)
PurgeTask = namedtuple('PurgeTask', ['keep_days']) PurgeTask = namedtuple('PurgeTask', ['keep_days', 'repack'])
class Recorder(threading.Thread): class Recorder(threading.Thread):
@ -188,10 +184,12 @@ class Recorder(threading.Thread):
"""Initialize the recorder.""" """Initialize the recorder."""
self.hass.bus.async_listen(MATCH_ALL, self.event_listener) self.hass.bus.async_listen(MATCH_ALL, self.event_listener)
def do_adhoc_purge(self, keep_days): def do_adhoc_purge(self, **kwargs):
"""Trigger an adhoc purge retaining keep_days worth of data.""" """Trigger an adhoc purge retaining keep_days worth of data."""
if keep_days is not None: keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days)
self.queue.put(PurgeTask(keep_days)) repack = kwargs.get(ATTR_REPACK)
self.queue.put(PurgeTask(keep_days, repack))
def run(self): def run(self):
"""Start processing events to save.""" """Start processing events to save."""
@ -261,7 +259,8 @@ class Recorder(threading.Thread):
@callback @callback
def async_purge(now): def async_purge(now):
"""Trigger the purge and schedule the next run.""" """Trigger the purge and schedule the next run."""
self.queue.put(PurgeTask(self.keep_days)) self.queue.put(
PurgeTask(self.keep_days, repack=not self.did_vacuum))
self.hass.helpers.event.async_track_point_in_time( self.hass.helpers.event.async_track_point_in_time(
async_purge, now + timedelta(days=self.purge_interval)) async_purge, now + timedelta(days=self.purge_interval))
@ -294,7 +293,7 @@ class Recorder(threading.Thread):
self.queue.task_done() self.queue.task_done()
return return
elif isinstance(event, PurgeTask): elif isinstance(event, PurgeTask):
purge.purge_old_data(self, event.keep_days) purge.purge_old_data(self, event.keep_days, event.repack)
self.queue.task_done() self.queue.task_done()
continue continue
elif event.event_type == EVENT_TIME_CHANGED: elif event.event_type == EVENT_TIME_CHANGED:

View File

@ -9,13 +9,14 @@ from .util import session_scope
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def purge_old_data(instance, purge_days): def purge_old_data(instance, purge_days, repack):
"""Purge events and states older than purge_days ago.""" """Purge events and states older than purge_days ago."""
from .models import States, Events from .models import States, Events
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.sql import exists from sqlalchemy.sql import exists
purge_before = dt_util.utcnow() - timedelta(days=purge_days) purge_before = dt_util.utcnow() - timedelta(days=purge_days)
_LOGGER.debug("Purging events before %s", purge_before)
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
# For each entity, the most recent state is protected from deletion # For each entity, the most recent state is protected from deletion
@ -55,10 +56,10 @@ def purge_old_data(instance, purge_days):
# Execute sqlite vacuum command to free up space on disk # Execute sqlite vacuum command to free up space on disk
_LOGGER.debug("DB engine driver: %s", instance.engine.driver) _LOGGER.debug("DB engine driver: %s", instance.engine.driver)
if instance.engine.driver == 'pysqlite' and not instance.did_vacuum: if repack and instance.engine.driver == 'pysqlite':
from sqlalchemy import exc from sqlalchemy import exc
_LOGGER.info("Vacuuming SQLite to free space") _LOGGER.debug("Vacuuming SQLite to free space")
try: try:
instance.engine.execute("VACUUM") instance.engine.execute("VACUUM")
instance.did_vacuum = True instance.did_vacuum = True

View File

@ -6,3 +6,6 @@ purge:
keep_days: keep_days:
description: Number of history days to keep in database after purge. Value >= 0. description: Number of history days to keep in database after purge. Value >= 0.
example: 2 example: 2
repack:
description: Attempt to save disk space by rewriting the entire database file.
example: true

View File

@ -95,10 +95,6 @@ conversation:
# Enables support for tracking state changes over time # Enables support for tracking state changes over time
history: history:
# Tracked history is kept for 10 days
recorder:
purge_keep_days: 10
# View all events in a logbook # View all events in a logbook
logbook: logbook:

View File

@ -16,9 +16,8 @@ class TestRecorderPurge(unittest.TestCase):
def setUp(self): # pylint: disable=invalid-name def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started.""" """Setup things to be run when tests are started."""
config = {'purge_keep_days': 4, 'purge_interval': 2}
self.hass = get_test_home_assistant() self.hass = get_test_home_assistant()
init_recorder_component(self.hass, config) init_recorder_component(self.hass)
self.hass.start() self.hass.start()
def tearDown(self): # pylint: disable=invalid-name def tearDown(self): # pylint: disable=invalid-name
@ -29,14 +28,18 @@ class TestRecorderPurge(unittest.TestCase):
"""Add multiple states to the db for testing.""" """Add multiple states to the db for testing."""
now = datetime.now() now = datetime.now()
five_days_ago = now - timedelta(days=5) five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
attributes = {'test_attr': 5, 'test_attr_10': 'nice'} attributes = {'test_attr': 5, 'test_attr_10': 'nice'}
self.hass.block_till_done() self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done() self.hass.data[DATA_INSTANCE].block_till_done()
with recorder.session_scope(hass=self.hass) as session: with recorder.session_scope(hass=self.hass) as session:
for event_id in range(5): for event_id in range(6):
if event_id < 3: if event_id < 2:
timestamp = eleven_days_ago
state = 'autopurgeme'
elif event_id < 4:
timestamp = five_days_ago timestamp = five_days_ago
state = 'purgeme' state = 'purgeme'
else: else:
@ -65,9 +68,9 @@ class TestRecorderPurge(unittest.TestCase):
domain='sensor', domain='sensor',
state='iamprotected', state='iamprotected',
attributes=json.dumps(attributes), attributes=json.dumps(attributes),
last_changed=five_days_ago, last_changed=eleven_days_ago,
last_updated=five_days_ago, last_updated=eleven_days_ago,
created=five_days_ago, created=eleven_days_ago,
event_id=protected_event_id event_id=protected_event_id
)) ))
@ -75,14 +78,18 @@ class TestRecorderPurge(unittest.TestCase):
"""Add a few events for testing.""" """Add a few events for testing."""
now = datetime.now() now = datetime.now()
five_days_ago = now - timedelta(days=5) five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
event_data = {'test_attr': 5, 'test_attr_10': 'nice'} event_data = {'test_attr': 5, 'test_attr_10': 'nice'}
self.hass.block_till_done() self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done() self.hass.data[DATA_INSTANCE].block_till_done()
with recorder.session_scope(hass=self.hass) as session: with recorder.session_scope(hass=self.hass) as session:
for event_id in range(5): for event_id in range(6):
if event_id < 2: if event_id < 2:
timestamp = eleven_days_ago
event_type = 'EVENT_TEST_AUTOPURGE'
elif event_id < 4:
timestamp = five_days_ago timestamp = five_days_ago
event_type = 'EVENT_TEST_PURGE' event_type = 'EVENT_TEST_PURGE'
else: else:
@ -102,8 +109,8 @@ class TestRecorderPurge(unittest.TestCase):
event_type='EVENT_TEST_FOR_PROTECTED', event_type='EVENT_TEST_FOR_PROTECTED',
event_data=json.dumps(event_data), event_data=json.dumps(event_data),
origin='LOCAL', origin='LOCAL',
created=five_days_ago, created=eleven_days_ago,
time_fired=five_days_ago, time_fired=eleven_days_ago,
) )
session.add(protected_event) session.add(protected_event)
session.flush() session.flush()
@ -113,13 +120,13 @@ class TestRecorderPurge(unittest.TestCase):
def test_purge_old_states(self): def test_purge_old_states(self):
"""Test deleting old states.""" """Test deleting old states."""
self._add_test_states() self._add_test_states()
# make sure we start with 6 states # make sure we start with 7 states
with session_scope(hass=self.hass) as session: with session_scope(hass=self.hass) as session:
states = session.query(States) states = session.query(States)
self.assertEqual(states.count(), 6) self.assertEqual(states.count(), 7)
# run purge_old_data() # run purge_old_data()
purge_old_data(self.hass.data[DATA_INSTANCE], 4) purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
# we should only have 3 states left after purging # we should only have 3 states left after purging
self.assertEqual(states.count(), 3) self.assertEqual(states.count(), 3)
@ -131,13 +138,13 @@ class TestRecorderPurge(unittest.TestCase):
with session_scope(hass=self.hass) as session: with session_scope(hass=self.hass) as session:
events = session.query(Events).filter( events = session.query(Events).filter(
Events.event_type.like("EVENT_TEST%")) Events.event_type.like("EVENT_TEST%"))
self.assertEqual(events.count(), 6) self.assertEqual(events.count(), 7)
# run purge_old_data() # run purge_old_data()
purge_old_data(self.hass.data[DATA_INSTANCE], 4) purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
# now we should only have 3 events left # no state to protect, now we should only have 2 events left
self.assertEqual(events.count(), 3) self.assertEqual(events.count(), 2)
def test_purge_method(self): def test_purge_method(self):
"""Test purge method.""" """Test purge method."""
@ -148,24 +155,24 @@ class TestRecorderPurge(unittest.TestCase):
# make sure we start with 6 states # make sure we start with 6 states
with session_scope(hass=self.hass) as session: with session_scope(hass=self.hass) as session:
states = session.query(States) states = session.query(States)
self.assertEqual(states.count(), 6) self.assertEqual(states.count(), 7)
events = session.query(Events).filter( events = session.query(Events).filter(
Events.event_type.like("EVENT_TEST%")) Events.event_type.like("EVENT_TEST%"))
self.assertEqual(events.count(), 6) self.assertEqual(events.count(), 7)
self.hass.data[DATA_INSTANCE].block_till_done() self.hass.data[DATA_INSTANCE].block_till_done()
# run purge method - no service data, should not work # run purge method - no service data, use defaults
self.hass.services.call('recorder', 'purge') self.hass.services.call('recorder', 'purge')
self.hass.async_block_till_done() self.hass.async_block_till_done()
# Small wait for recorder thread # Small wait for recorder thread
self.hass.data[DATA_INSTANCE].block_till_done() self.hass.data[DATA_INSTANCE].block_till_done()
# we should still have everything from before # only purged old events
self.assertEqual(states.count(), 6) self.assertEqual(states.count(), 5)
self.assertEqual(events.count(), 6) self.assertEqual(events.count(), 5)
# run purge method - correct service data # run purge method - correct service data
self.hass.services.call('recorder', 'purge', self.hass.services.call('recorder', 'purge',
@ -182,11 +189,20 @@ class TestRecorderPurge(unittest.TestCase):
self.assertTrue('iamprotected' in ( self.assertTrue('iamprotected' in (
state.state for state in states)) state.state for state in states))
# now we should only have 4 events left # now we should only have 3 events left
self.assertEqual(events.count(), 4) self.assertEqual(events.count(), 3)
# and the protected event is among them # and the protected event is among them
self.assertTrue('EVENT_TEST_FOR_PROTECTED' in ( self.assertTrue('EVENT_TEST_FOR_PROTECTED' in (
event.event_type for event in events.all())) event.event_type for event in events.all()))
self.assertFalse('EVENT_TEST_PURGE' in ( self.assertFalse('EVENT_TEST_PURGE' in (
event.event_type for event in events.all())) event.event_type for event in events.all()))
# run purge method - correct service data, with repack
service_data['repack'] = True
self.assertFalse(self.hass.data[DATA_INSTANCE].did_vacuum)
self.hass.services.call('recorder', 'purge',
service_data=service_data)
self.hass.async_block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
self.assertTrue(self.hass.data[DATA_INSTANCE].did_vacuum)