Merge pull request #2059 from infamy/justyns-purge_old_data

Justyns purge old data
This commit is contained in:
Alex Harvey 2016-05-14 23:55:28 -07:00
commit ffbc99fac2
2 changed files with 149 additions and 6 deletions

View File

@ -13,7 +13,8 @@ import logging
import queue import queue
import sqlite3 import sqlite3
import threading import threading
from datetime import date, datetime from datetime import date, datetime, timedelta
import voluptuous as vol
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.const import ( from homeassistant.const import (
@ -21,6 +22,7 @@ from homeassistant.const import (
EVENT_TIME_CHANGED, MATCH_ALL) EVENT_TIME_CHANGED, MATCH_ALL)
from homeassistant.core import Event, EventOrigin, State from homeassistant.core import Event, EventOrigin, State
from homeassistant.remote import JSONEncoder from homeassistant.remote import JSONEncoder
from homeassistant.helpers.event import track_point_in_utc_time
DOMAIN = "recorder" DOMAIN = "recorder"
@ -30,6 +32,15 @@ RETURN_ROWCOUNT = "rowcount"
RETURN_LASTROWID = "lastrowid" RETURN_LASTROWID = "lastrowid"
RETURN_ONE_ROW = "one_row" RETURN_ONE_ROW = "one_row"
CONF_PURGE_DAYS = "purge_days"
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_PURGE_DAYS): vol.All(vol.Coerce(int),
vol.Range(min=1)),
})
}, extra=vol.ALLOW_EXTRA)
_INSTANCE = None _INSTANCE = None
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -102,14 +113,14 @@ def setup(hass, config):
"""Setup the recorder.""" """Setup the recorder."""
# pylint: disable=global-statement # pylint: disable=global-statement
global _INSTANCE global _INSTANCE
purge_days = config.get(DOMAIN, {}).get(CONF_PURGE_DAYS)
_INSTANCE = Recorder(hass) _INSTANCE = Recorder(hass, purge_days=purge_days)
return True return True
class RecorderRun(object): class RecorderRun(object):
"""Representation of arecorder run.""" """Representation of a recorder run."""
def __init__(self, row=None): def __init__(self, row=None):
"""Initialize the recorder run.""" """Initialize the recorder run."""
@ -169,11 +180,12 @@ class Recorder(threading.Thread):
"""A threaded recorder class.""" """A threaded recorder class."""
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
def __init__(self, hass): def __init__(self, hass, purge_days):
"""Initialize the recorder.""" """Initialize the recorder."""
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.hass = hass self.hass = hass
self.purge_days = purge_days
self.conn = None self.conn = None
self.queue = queue.Queue() self.queue = queue.Queue()
self.quit_object = object() self.quit_object = object()
@ -194,6 +206,10 @@ class Recorder(threading.Thread):
"""Start processing events to save.""" """Start processing events to save."""
self._setup_connection() self._setup_connection()
self._setup_run() self._setup_run()
if self.purge_days is not None:
track_point_in_utc_time(self.hass,
lambda now: self._purge_old_data(),
dt_util.utcnow() + timedelta(minutes=5))
while True: while True:
event = self.queue.get() event = self.queue.get()
@ -475,6 +491,32 @@ class Recorder(threading.Thread):
"UPDATE recorder_runs SET end=? WHERE start=?", "UPDATE recorder_runs SET end=? WHERE start=?",
(dt_util.utcnow(), self.recording_start)) (dt_util.utcnow(), self.recording_start))
def _purge_old_data(self):
"""Purge events and states older than purge_days ago."""
if not self.purge_days or self.purge_days < 1:
_LOGGER.debug("purge_days set to %s, will not purge any old data.",
self.purge_days)
return
purge_before = dt_util.utcnow() - timedelta(days=self.purge_days)
_LOGGER.info("Purging events created before %s", purge_before)
deleted_rows = self.query(
sql_query="DELETE FROM events WHERE created < ?;",
data=(int(purge_before.timestamp()),),
return_value=RETURN_ROWCOUNT)
_LOGGER.debug("Deleted %s events", deleted_rows)
_LOGGER.info("Purging states created before %s", purge_before)
deleted_rows = self.query(
sql_query="DELETE FROM states WHERE created < ?;",
data=(int(purge_before.timestamp()),),
return_value=RETURN_ROWCOUNT)
_LOGGER.debug("Deleted %s states", deleted_rows)
# Execute sqlite vacuum command to free up space on disk
self.query("VACUUM;")
def _adapt_datetime(datetimestamp): def _adapt_datetime(datetimestamp):
"""Turn a datetime into an integer for in the DB.""" """Turn a datetime into an integer for in the DB."""

View File

@ -1,6 +1,8 @@
"""The tests for the Recorder component.""" """The tests for the Recorder component."""
# pylint: disable=too-many-public-methods,protected-access # pylint: disable=too-many-public-methods,protected-access
import unittest import unittest
import time
import json
from unittest.mock import patch from unittest.mock import patch
from homeassistant.const import MATCH_ALL from homeassistant.const import MATCH_ALL
@ -10,7 +12,7 @@ from tests.common import get_test_home_assistant
class TestRecorder(unittest.TestCase): class TestRecorder(unittest.TestCase):
"""Test the chromecast module.""" """Test the recorder module."""
def setUp(self): # pylint: disable=invalid-name def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started.""" """Setup things to be run when tests are started."""
@ -25,6 +27,52 @@ class TestRecorder(unittest.TestCase):
self.hass.stop() self.hass.stop()
recorder._INSTANCE.block_till_done() recorder._INSTANCE.block_till_done()
def _add_test_states(self):
"""Add multiple states to the db for testing."""
now = int(time.time())
five_days_ago = now - (60*60*24*5)
attributes = {'test_attr': 5, 'test_attr_10': 'nice'}
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
for event_id in range(5):
if event_id < 3:
timestamp = five_days_ago
state = 'purgeme'
else:
timestamp = now
state = 'dontpurgeme'
recorder.query("INSERT INTO states ("
"entity_id, domain, state, attributes,"
"last_changed, last_updated, created,"
"utc_offset, event_id)"
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
('test.recorder2', 'sensor', state,
json.dumps(attributes), timestamp, timestamp,
timestamp, -18000, event_id + 1000))
def _add_test_events(self):
"""Add a few events for testing."""
now = int(time.time())
five_days_ago = now - (60*60*24*5)
event_data = {'test_attr': 5, 'test_attr_10': 'nice'}
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
for event_id in range(5):
if event_id < 2:
timestamp = five_days_ago
event_type = 'EVENT_TEST_PURGE'
else:
timestamp = now
event_type = 'EVENT_TEST'
recorder.query("INSERT INTO events"
"(event_type, event_data, origin, created,"
"time_fired, utc_offset)"
"VALUES (?, ?, ?, ?, ?, ?)",
(event_type, json.dumps(event_data), 'LOCAL',
timestamp, timestamp, -18000))
def test_saving_state(self): def test_saving_state(self):
"""Test saving and restoring a state.""" """Test saving and restoring a state."""
entity_id = 'test.recorder' entity_id = 'test.recorder'
@ -76,3 +124,56 @@ class TestRecorder(unittest.TestCase):
# Recorder uses SQLite and stores datetimes as integer unix timestamps # Recorder uses SQLite and stores datetimes as integer unix timestamps
assert event.time_fired.replace(microsecond=0) == \ assert event.time_fired.replace(microsecond=0) == \
db_event.time_fired.replace(microsecond=0) db_event.time_fired.replace(microsecond=0)
def test_purge_old_states(self):
"""Test deleting old states."""
self._add_test_states()
# make sure we start with 5 states
states = recorder.query_states('SELECT * FROM states')
self.assertEqual(len(states), 5)
# run purge_old_data()
recorder._INSTANCE.purge_days = 4
recorder._INSTANCE._purge_old_data()
# we should only have 2 states left after purging
states = recorder.query_states('SELECT * FROM states')
self.assertEqual(len(states), 2)
def test_purge_old_events(self):
"""Test deleting old events."""
self._add_test_events()
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(events), 5)
# run purge_old_data()
recorder._INSTANCE.purge_days = 4
recorder._INSTANCE._purge_old_data()
# now we should only have 3 events left
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(events), 3)
def test_purge_disabled(self):
"""Test leaving purge_days disabled."""
self._add_test_states()
self._add_test_events()
# make sure we start with 5 states and events
states = recorder.query_states('SELECT * FROM states')
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(states), 5)
self.assertEqual(len(events), 5)
# run purge_old_data()
recorder._INSTANCE.purge_days = None
recorder._INSTANCE._purge_old_data()
# we should have all of our states still
states = recorder.query_states('SELECT * FROM states')
events = recorder.query_events('SELECT * FROM events WHERE '
'event_type LIKE "EVENT_TEST%"')
self.assertEqual(len(states), 5)
self.assertEqual(len(events), 5)