Add history_stats sensor (#5610)

* Add new sensor/history_stats component

* Add real unit tests

* Code style changes requested for pull request

* Remove time aliases & allow datetime values

* Reformat unit tests

* Remove all async behavior in history_stats

* Change duration format
This commit is contained in:
Boris K 2017-02-10 03:47:12 +01:00 committed by Paulus Schoutsen
parent be08bf0ef7
commit 5ecef6aaac
2 changed files with 493 additions and 0 deletions

View File

@ -0,0 +1,278 @@
"""
Component to make instant statistics about your history.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.history_stats/
"""
import datetime
import logging
import math
import voluptuous as vol
import homeassistant.components.history as history
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_ENTITY_ID, CONF_STATE, EVENT_HOMEASSISTANT_START)
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import track_state_change
_LOGGER = logging.getLogger(__name__)
DOMAIN = 'history_stats'
DEPENDENCIES = ['history']
CONF_START = 'start'
CONF_END = 'end'
CONF_DURATION = 'duration'
CONF_PERIOD_KEYS = [CONF_START, CONF_END, CONF_DURATION]
DEFAULT_NAME = 'unnamed statistics'
UNIT = 'h'
UNIT_RATIO = '%'
ICON = 'mdi:chart-line'
ATTR_START = 'from'
ATTR_END = 'to'
ATTR_VALUE = 'value'
ATTR_RATIO = 'ratio'
def exactly_two_period_keys(conf):
"""Ensure exactly 2 of CONF_PERIOD_KEYS are provided."""
provided = 0
for param in CONF_PERIOD_KEYS:
if param in conf and conf[param] is not None:
provided += 1
if provided != 2:
raise vol.Invalid('You must provide exactly 2 of the following:'
' start, end, duration')
return conf
PLATFORM_SCHEMA = vol.All(PLATFORM_SCHEMA.extend({
vol.Required(CONF_ENTITY_ID): cv.entity_id,
vol.Required(CONF_STATE): cv.slug,
vol.Optional(CONF_START, default=None): cv.template,
vol.Optional(CONF_END, default=None): cv.template,
vol.Optional(CONF_DURATION, default=None): cv.time_period,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
}), exactly_two_period_keys)
# noinspection PyUnusedLocal
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the History Stats sensor."""
entity_id = config.get(CONF_ENTITY_ID)
entity_state = config.get(CONF_STATE)
start = config.get(CONF_START)
end = config.get(CONF_END)
duration = config.get(CONF_DURATION)
name = config.get(CONF_NAME)
for template in [start, end]:
if template is not None:
template.hass = hass
add_devices([HistoryStatsSensor(
hass, entity_id, entity_state, start, end, duration, name)])
return True
class HistoryStatsSensor(Entity):
"""Representation of a HistoryStats sensor."""
def __init__(
self, hass, entity_id, entity_state, start, end, duration, name):
"""Initialize the HistoryStats sensor."""
self._hass = hass
self._entity_id = entity_id
self._entity_state = entity_state
self._duration = duration
self._start = start
self._end = end
self._name = name
self._unit_of_measurement = UNIT
self._period = (datetime.datetime.now(), datetime.datetime.now())
self.value = 0
def force_refresh(*args):
"""Force the component to refresh."""
self.schedule_update_ha_state(True)
# Update value when home assistant starts
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, force_refresh)
# Update value when tracked entity changes its state
track_state_change(hass, entity_id, force_refresh)
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return round(self.value, 2)
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement
@property
def should_poll(self):
"""Polling required."""
return True
@property
def device_state_attributes(self):
"""Return the state attributes of the sensor."""
start, end = self._period
hsh = HistoryStatsHelper
return {
ATTR_VALUE: hsh.pretty_duration(self.value),
ATTR_RATIO: hsh.pretty_ratio(self.value, self._period),
ATTR_START: start.strftime('%Y-%m-%d %H:%M:%S'),
ATTR_END: end.strftime('%Y-%m-%d %H:%M:%S'),
}
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
return ICON
def update(self):
"""Get the latest data and updates the states."""
# Parse templates
self.update_period()
start, end = self._period
# Convert to UTC
start = dt_util.as_utc(start)
end = dt_util.as_utc(end)
# Get history between start and end
history_list = history.state_changes_during_period(
start, end, str(self._entity_id))
if self._entity_id not in history_list.keys():
return
# Get the first state
last_state = history.get_state(start, self._entity_id)
last_state = (last_state is not None and
last_state == self._entity_state)
last_time = dt_util.as_timestamp(start)
elapsed = 0
# Make calculations
for item in history_list.get(self._entity_id):
current_state = item.state == self._entity_state
current_time = item.last_changed.timestamp()
if last_state:
elapsed += current_time - last_time
last_state = current_state
last_time = current_time
# Save value in hours
self.value = elapsed / 3600
def update_period(self):
"""Parse the templates and store a datetime tuple in _period."""
start = None
end = None
# Parse start
if self._start is not None:
try:
start_rendered = self._start.render()
except TemplateError as ex:
HistoryStatsHelper.handle_template_exception(ex, 'start')
return
start = dt_util.parse_datetime(start_rendered)
if start is None:
try:
start = dt_util.as_local(dt_util.utc_from_timestamp(
math.floor(float(start_rendered))))
except ValueError:
_LOGGER.error('PARSING ERROR: start must be a datetime'
' or a timestamp.')
return
# Parse end
if self._end is not None:
try:
end_rendered = self._end.render()
except TemplateError as ex:
HistoryStatsHelper.handle_template_exception(ex, 'end')
return
end = dt_util.parse_datetime(end_rendered)
if end is None:
try:
end = dt_util.as_local(dt_util.utc_from_timestamp(
math.floor(float(end_rendered))))
except ValueError:
_LOGGER.error('PARSING ERROR: end must be a datetime'
' or a timestamp.')
return
# Calculate start or end using the duration
if start is None:
start = end - self._duration
if end is None:
end = start + self._duration
self._period = start, end
class HistoryStatsHelper:
"""Static methods to make the HistoryStatsSensor code lighter."""
@staticmethod
def pretty_duration(hours):
"""Format a duration in days, hours, minutes, seconds."""
seconds = int(3600 * hours)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
if days > 0:
return '%dd %dh %dm %ds' % (days, hours, minutes, seconds)
elif hours > 0:
return '%dh %dm %ds' % (hours, minutes, seconds)
elif minutes > 0:
return '%dm %ds' % (minutes, seconds)
else:
return '%ds' % (seconds,)
@staticmethod
def pretty_ratio(value, period):
"""Format the ratio of value / period duration."""
if len(period) != 2 or period[0] == period[1]:
return '0,0' + UNIT_RATIO
ratio = 100 * 3600 * value / (period[1] - period[0]).total_seconds()
return str(round(ratio, 1)) + UNIT_RATIO
@staticmethod
def handle_template_exception(ex, field):
"""Log an error nicely if the template cannot be interpreted."""
if ex.args and ex.args[0].startswith(
"UndefinedError: 'None' has no attribute"):
# Common during HA startup - so just a warning
_LOGGER.warning(ex)
return
_LOGGER.error('Error parsing template for [' + field + ']')
_LOGGER.error(ex)

View File

@ -0,0 +1,215 @@
"""The test for the History Statistics sensor platform."""
# pylint: disable=protected-access
import unittest
from datetime import timedelta
from unittest.mock import patch
import homeassistant.components.recorder as recorder
import homeassistant.core as ha
import homeassistant.util.dt as dt_util
from homeassistant.bootstrap import setup_component
from homeassistant.components.sensor.history_stats import HistoryStatsSensor
from homeassistant.helpers.template import Template
from tests.common import get_test_home_assistant
class TestHistoryStatsSensor(unittest.TestCase):
"""Test the History Statistics sensor."""
def setUp(self):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
def tearDown(self):
"""Stop everything that was started."""
self.hass.stop()
def test_setup(self):
"""Test the history statistics sensor setup."""
self.init_recorder()
config = {
'history': {
},
'sensor': {
'platform': 'history_stats',
'entity_id': 'binary_sensor.test_id',
'state': 'on',
'start': '{{ now().replace(hour=0)'
'.replace(minute=0).replace(second=0) }}',
'duration': '02:00',
'name': 'Test',
}
}
self.assertTrue(setup_component(self.hass, 'sensor', config))
state = self.hass.states.get('sensor.test').as_dict()
self.assertEqual(state['state'], '0')
def test_period_parsing(self):
"""Test the conversion from templates to period."""
today = Template('{{ now().replace(hour=0).replace(minute=0)'
'.replace(second=0) }}', self.hass)
duration = timedelta(hours=2, minutes=1)
sensor1 = HistoryStatsSensor(
self.hass, 'test', 'on', today, None, duration, 'test')
sensor2 = HistoryStatsSensor(
self.hass, 'test', 'on', None, today, duration, 'test')
sensor1.update_period()
sensor2.update_period()
self.assertEqual(
sensor1.device_state_attributes['from'][-8:], '00:00:00')
self.assertEqual(
sensor1.device_state_attributes['to'][-8:], '02:01:00')
self.assertEqual(
sensor2.device_state_attributes['from'][-8:], '21:59:00')
self.assertEqual(
sensor2.device_state_attributes['to'][-8:], '00:00:00')
def test_measure(self):
"""Test the history statistics sensor measure."""
later = dt_util.utcnow() - timedelta(seconds=15)
earlier = later - timedelta(minutes=30)
fake_states = {
'binary_sensor.test_id': [
ha.State('binary_sensor.test_id', 'on', last_changed=earlier),
ha.State('binary_sensor.test_id', 'off', last_changed=later),
]
}
start = Template('{{ as_timestamp(now()) - 3600 }}', self.hass)
end = Template('{{ now() }}', self.hass)
sensor1 = HistoryStatsSensor(
self.hass, 'binary_sensor.test_id', 'on', start, end, None, 'Test')
sensor2 = HistoryStatsSensor(
self.hass, 'unknown.id', 'on', start, end, None, 'Test')
with patch('homeassistant.components.history.'
'state_changes_during_period', return_value=fake_states):
with patch('homeassistant.components.history.get_state',
return_value=None):
sensor1.update()
sensor2.update()
self.assertEqual(sensor1.value, 0.5)
self.assertEqual(sensor2.value, 0)
self.assertEqual(sensor1.device_state_attributes['ratio'], '50.0%')
def test_wrong_date(self):
"""Test when start or end value is not a timestamp or a date."""
good = Template('{{ now() }}', self.hass)
bad = Template('{{ TEST }}', self.hass)
sensor1 = HistoryStatsSensor(
self.hass, 'test', 'on', good, bad, None, 'Test')
sensor2 = HistoryStatsSensor(
self.hass, 'test', 'on', bad, good, None, 'Test')
before_update1 = sensor1._period
before_update2 = sensor2._period
sensor1.update_period()
sensor2.update_period()
self.assertEqual(before_update1, sensor1._period)
self.assertEqual(before_update2, sensor2._period)
def test_wrong_duration(self):
"""Test when duration value is not a timedelta."""
self.init_recorder()
config = {
'history': {
},
'sensor': {
'platform': 'history_stats',
'entity_id': 'binary_sensor.test_id',
'name': 'Test',
'state': 'on',
'start': '{{ now() }}',
'duration': 'TEST',
}
}
setup_component(self.hass, 'sensor', config)
self.assertEqual(self.hass.states.get('sensor.test'), None)
self.assertRaises(TypeError,
setup_component(self.hass, 'sensor', config))
def test_bad_template(self):
"""Test Exception when the template cannot be parsed."""
bad = Template('{{ x - 12 }}', self.hass) # x is undefined
duration = '01:00'
sensor1 = HistoryStatsSensor(
self.hass, 'test', 'on', bad, None, duration, 'Test')
sensor2 = HistoryStatsSensor(
self.hass, 'test', 'on', None, bad, duration, 'Test')
before_update1 = sensor1._period
before_update2 = sensor2._period
sensor1.update_period()
sensor2.update_period()
self.assertEqual(before_update1, sensor1._period)
self.assertEqual(before_update2, sensor2._period)
def test_not_enough_arguments(self):
"""Test config when not enough arguments provided."""
self.init_recorder()
config = {
'history': {
},
'sensor': {
'platform': 'history_stats',
'entity_id': 'binary_sensor.test_id',
'name': 'Test',
'state': 'on',
'start': '{{ now() }}',
}
}
setup_component(self.hass, 'sensor', config)
self.assertEqual(self.hass.states.get('sensor.test'), None)
self.assertRaises(TypeError,
setup_component(self.hass, 'sensor', config))
def test_too_many_arguments(self):
"""Test config when too many arguments provided."""
self.init_recorder()
config = {
'history': {
},
'sensor': {
'platform': 'history_stats',
'entity_id': 'binary_sensor.test_id',
'name': 'Test',
'state': 'on',
'start': '{{ as_timestamp(now()) - 3600 }}',
'end': '{{ now() }}',
'duration': '01:00',
}
}
setup_component(self.hass, 'sensor', config)
self.assertEqual(self.hass.states.get('sensor.test'), None)
self.assertRaises(TypeError,
setup_component(self.hass, 'sensor', config))
def init_recorder(self):
"""Initialize the recorder."""
db_uri = 'sqlite://'
with patch('homeassistant.core.Config.path', return_value=db_uri):
setup_component(self.hass, recorder.DOMAIN, {
"recorder": {
"db_url": db_uri}})
self.hass.start()
recorder._INSTANCE.block_till_db_ready()
self.hass.block_till_done()
recorder._INSTANCE.block_till_done()