Binary sensor for detecting linear trends (#9808)

* Trend sensor now uses linear regression to calculate trend

* Added numpy to trend sensor test requirements

* Added trendline tests

* Trend sensor now has max_samples attribute

* Trend sensor uses utcnow from HA utils

* Trend sensor now completes setup in async_added_to_hass

* Fixed linter issues

* Fixed broken import

* Trend tests make use of max_samples

* Added @asyncio.coroutine decorator to trend update callback

* Update trend.py
This commit is contained in:
Sam Birch 2017-10-26 04:33:17 +13:00 committed by Pascal Vizeli
parent 63c9d59d54
commit fc8940111d
5 changed files with 199 additions and 49 deletions

View File

@ -1,11 +1,13 @@
""" """
A sensor that monitors trands in other components. A sensor that monitors trends in other components.
For more details about this platform, please refer to the documentation at For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.trend/ https://home-assistant.io/components/sensor.trend/
""" """
import asyncio import asyncio
from collections import deque
import logging import logging
import math
import voluptuous as vol import voluptuous as vol
@ -16,21 +18,40 @@ from homeassistant.components.binary_sensor import (
BinarySensorDevice, ENTITY_ID_FORMAT, PLATFORM_SCHEMA, BinarySensorDevice, ENTITY_ID_FORMAT, PLATFORM_SCHEMA,
DEVICE_CLASSES_SCHEMA) DEVICE_CLASSES_SCHEMA)
from homeassistant.const import ( from homeassistant.const import (
ATTR_FRIENDLY_NAME, ATTR_ENTITY_ID, CONF_DEVICE_CLASS, STATE_UNKNOWN) ATTR_ENTITY_ID, ATTR_FRIENDLY_NAME,
CONF_DEVICE_CLASS, CONF_ENTITY_ID, CONF_FRIENDLY_NAME,
STATE_UNKNOWN)
from homeassistant.helpers.entity import generate_entity_id from homeassistant.helpers.entity import generate_entity_id
from homeassistant.helpers.event import track_state_change from homeassistant.helpers.event import async_track_state_change
from homeassistant.util import utcnow
REQUIREMENTS = ['numpy==1.13.3']
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
ATTR_ATTRIBUTE = 'attribute'
ATTR_GRADIENT = 'gradient'
ATTR_MIN_GRADIENT = 'min_gradient'
ATTR_INVERT = 'invert'
ATTR_SAMPLE_DURATION = 'sample_duration'
ATTR_SAMPLE_COUNT = 'sample_count'
CONF_SENSORS = 'sensors' CONF_SENSORS = 'sensors'
CONF_ATTRIBUTE = 'attribute' CONF_ATTRIBUTE = 'attribute'
CONF_MAX_SAMPLES = 'max_samples'
CONF_MIN_GRADIENT = 'min_gradient'
CONF_INVERT = 'invert' CONF_INVERT = 'invert'
CONF_SAMPLE_DURATION = 'sample_duration'
SENSOR_SCHEMA = vol.Schema({ SENSOR_SCHEMA = vol.Schema({
vol.Required(ATTR_ENTITY_ID): cv.entity_id, vol.Required(CONF_ENTITY_ID): cv.entity_id,
vol.Optional(CONF_ATTRIBUTE): cv.string, vol.Optional(CONF_ATTRIBUTE): cv.string,
vol.Optional(ATTR_FRIENDLY_NAME): cv.string,
vol.Optional(CONF_INVERT, default=False): cv.boolean,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA, vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_FRIENDLY_NAME): cv.string,
vol.Optional(CONF_MAX_SAMPLES, default=2): cv.positive_int,
vol.Optional(CONF_MIN_GRADIENT, default=0.0): vol.Coerce(float),
vol.Optional(CONF_INVERT, default=False): cv.boolean,
vol.Optional(CONF_SAMPLE_DURATION, default=0): cv.positive_int,
}) })
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
@ -43,17 +64,21 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the trend sensors.""" """Set up the trend sensors."""
sensors = [] sensors = []
for device, device_config in config[CONF_SENSORS].items(): for device_id, device_config in config[CONF_SENSORS].items():
entity_id = device_config[ATTR_ENTITY_ID] entity_id = device_config[ATTR_ENTITY_ID]
attribute = device_config.get(CONF_ATTRIBUTE) attribute = device_config.get(CONF_ATTRIBUTE)
friendly_name = device_config.get(ATTR_FRIENDLY_NAME, device)
device_class = device_config.get(CONF_DEVICE_CLASS) device_class = device_config.get(CONF_DEVICE_CLASS)
friendly_name = device_config.get(ATTR_FRIENDLY_NAME, device_id)
invert = device_config[CONF_INVERT] invert = device_config[CONF_INVERT]
max_samples = device_config[CONF_MAX_SAMPLES]
min_gradient = device_config[CONF_MIN_GRADIENT]
sample_duration = device_config[CONF_SAMPLE_DURATION]
sensors.append( sensors.append(
SensorTrend( SensorTrend(
hass, device, friendly_name, entity_id, attribute, hass, device_id, friendly_name, entity_id, attribute,
device_class, invert) device_class, invert, max_samples, min_gradient,
sample_duration)
) )
if not sensors: if not sensors:
_LOGGER.error("No sensors added") _LOGGER.error("No sensors added")
@ -65,30 +90,23 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
class SensorTrend(BinarySensorDevice): class SensorTrend(BinarySensorDevice):
"""Representation of a trend Sensor.""" """Representation of a trend Sensor."""
def __init__(self, hass, device_id, friendly_name, def __init__(self, hass, device_id, friendly_name, entity_id,
target_entity, attribute, device_class, invert): attribute, device_class, invert, max_samples,
min_gradient, sample_duration):
"""Initialize the sensor.""" """Initialize the sensor."""
self._hass = hass self._hass = hass
self.entity_id = generate_entity_id( self.entity_id = generate_entity_id(
ENTITY_ID_FORMAT, device_id, hass=hass) ENTITY_ID_FORMAT, device_id, hass=hass)
self._name = friendly_name self._name = friendly_name
self._target_entity = target_entity self._entity_id = entity_id
self._attribute = attribute self._attribute = attribute
self._device_class = device_class self._device_class = device_class
self._invert = invert self._invert = invert
self._sample_duration = sample_duration
self._min_gradient = min_gradient
self._gradient = None
self._state = None self._state = None
self.from_state = None self.samples = deque(maxlen=max_samples)
self.to_state = None
@callback
def trend_sensor_state_listener(entity, old_state, new_state):
"""Handle the target device state changes."""
self.from_state = old_state
self.to_state = new_state
hass.async_add_job(self.async_update_ha_state(True))
track_state_change(hass, target_entity,
trend_sensor_state_listener)
@property @property
def name(self): def name(self):
@ -105,33 +123,77 @@ class SensorTrend(BinarySensorDevice):
"""Return the sensor class of the sensor.""" """Return the sensor class of the sensor."""
return self._device_class return self._device_class
@property
def device_state_attributes(self):
"""Return the state attributes of the sensor."""
return {
ATTR_ENTITY_ID: self._entity_id,
ATTR_FRIENDLY_NAME: self._name,
ATTR_INVERT: self._invert,
ATTR_GRADIENT: self._gradient,
ATTR_MIN_GRADIENT: self._min_gradient,
ATTR_SAMPLE_DURATION: self._sample_duration,
ATTR_SAMPLE_COUNT: len(self.samples),
}
@property @property
def should_poll(self): def should_poll(self):
"""No polling needed.""" """No polling needed."""
return False return False
@asyncio.coroutine
def async_added_to_hass(self):
"""Complete device setup after being added to hass."""
@callback
def trend_sensor_state_listener(entity, old_state, new_state):
"""Handle state changes on the observed device."""
try:
if self._attribute:
state = new_state.attributes.get(self._attribute)
else:
state = new_state.state
if state != STATE_UNKNOWN:
sample = (utcnow().timestamp(), float(state))
self.samples.append(sample)
self.async_schedule_update_ha_state(True)
except (ValueError, TypeError) as ex:
_LOGGER.error(ex)
async_track_state_change(
self.hass, self._entity_id,
trend_sensor_state_listener)
@asyncio.coroutine @asyncio.coroutine
def async_update(self): def async_update(self):
"""Get the latest data and update the states.""" """Get the latest data and update the states."""
if self.from_state is None or self.to_state is None: # Remove outdated samples
return if self._sample_duration > 0:
if (self.from_state.state == STATE_UNKNOWN or cutoff = utcnow().timestamp() - self._sample_duration
self.to_state.state == STATE_UNKNOWN): while self.samples and self.samples[0][0] < cutoff:
return self.samples.popleft()
try:
if self._attribute:
from_value = float(
self.from_state.attributes.get(self._attribute))
to_value = float(
self.to_state.attributes.get(self._attribute))
else:
from_value = float(self.from_state.state)
to_value = float(self.to_state.state)
self._state = to_value > from_value if len(self.samples) < 2:
if self._invert: return
self._state = not self._state
except (ValueError, TypeError) as ex: # Calculate gradient of linear trend
self._state = None yield from self.hass.async_add_job(self._calculate_gradient)
_LOGGER.error(ex)
# Update state
self._state = (
abs(self._gradient) > abs(self._min_gradient) and
math.copysign(self._gradient, self._min_gradient) == self._gradient
)
if self._invert:
self._state = not self._state
def _calculate_gradient(self):
"""Compute the linear trend gradient of the current samples.
This need run inside executor.
"""
import numpy as np
timestamps = np.array([t for t, _ in self.samples])
values = np.array([s for _, s in self.samples])
coeffs = np.polyfit(timestamps, values, 1)
self._gradient = coeffs[0]

View File

@ -468,6 +468,7 @@ netdisco==1.2.2
# homeassistant.components.sensor.neurio_energy # homeassistant.components.sensor.neurio_energy
neurio==0.3.1 neurio==0.3.1
# homeassistant.components.binary_sensor.trend
# homeassistant.components.image_processing.opencv # homeassistant.components.image_processing.opencv
numpy==1.13.3 numpy==1.13.3

View File

@ -87,6 +87,10 @@ libsoundtouch==0.7.2
# homeassistant.components.switch.mfi # homeassistant.components.switch.mfi
mficlient==0.3.0 mficlient==0.3.0
# homeassistant.components.binary_sensor.trend
# homeassistant.components.image_processing.opencv
numpy==1.13.3
# homeassistant.components.mqtt # homeassistant.components.mqtt
# homeassistant.components.shiftr # homeassistant.components.shiftr
paho-mqtt==1.3.1 paho-mqtt==1.3.1

View File

@ -55,6 +55,7 @@ TEST_REQUIREMENTS = (
'libpurecoollink', 'libpurecoollink',
'libsoundtouch', 'libsoundtouch',
'mficlient', 'mficlient',
'numpy',
'paho-mqtt', 'paho-mqtt',
'pexpect', 'pexpect',
'pilight', 'pilight',

View File

@ -10,7 +10,7 @@ class TestTrendBinarySensor:
hass = None hass = None
def setup_method(self, method): def setup_method(self, method):
"""Setup things to be run when tests are started.""" """Set up things to be run when tests are started."""
self.hass = get_test_home_assistant() self.hass = get_test_home_assistant()
def teardown_method(self, method): def teardown_method(self, method):
@ -38,6 +38,67 @@ class TestTrendBinarySensor:
state = self.hass.states.get('binary_sensor.test_trend_sensor') state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'on' assert state.state == 'on'
def test_up_using_trendline(self):
"""Test up trend using multiple samples and trendline calculation."""
assert setup.setup_component(self.hass, 'binary_sensor', {
'binary_sensor': {
'platform': 'trend',
'sensors': {
'test_trend_sensor': {
'entity_id': "sensor.test_state",
'sample_duration': 300,
'min_gradient': 1,
'max_samples': 25,
}
}
}
})
for val in [1, 0, 2, 3]:
self.hass.states.set('sensor.test_state', val)
self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'on'
for val in [0, 1, 0, 0]:
self.hass.states.set('sensor.test_state', val)
self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'off'
def test_down_using_trendline(self):
"""Test down trend using multiple samples and trendline calculation."""
assert setup.setup_component(self.hass, 'binary_sensor', {
'binary_sensor': {
'platform': 'trend',
'sensors': {
'test_trend_sensor': {
'entity_id': "sensor.test_state",
'sample_duration': 300,
'min_gradient': 1,
'max_samples': 25,
'invert': 'Yes'
}
}
}
})
for val in [3, 2, 3, 1]:
self.hass.states.set('sensor.test_state', val)
self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'on'
for val in [4, 2, 4, 4]:
self.hass.states.set('sensor.test_state', val)
self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'off'
def test_down(self): def test_down(self):
"""Test down trend.""" """Test down trend."""
assert setup.setup_component(self.hass, 'binary_sensor', { assert setup.setup_component(self.hass, 'binary_sensor', {
@ -59,7 +120,7 @@ class TestTrendBinarySensor:
state = self.hass.states.get('binary_sensor.test_trend_sensor') state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'off' assert state.state == 'off'
def test__invert_up(self): def test_invert_up(self):
"""Test up trend with custom message.""" """Test up trend with custom message."""
assert setup.setup_component(self.hass, 'binary_sensor', { assert setup.setup_component(self.hass, 'binary_sensor', {
'binary_sensor': { 'binary_sensor': {
@ -142,11 +203,33 @@ class TestTrendBinarySensor:
self.hass.states.set('sensor.test_state', 'State', {'attr': '2'}) self.hass.states.set('sensor.test_state', 'State', {'attr': '2'})
self.hass.block_till_done() self.hass.block_till_done()
self.hass.states.set('sensor.test_state', 'State', {'attr': '1'}) self.hass.states.set('sensor.test_state', 'State', {'attr': '1'})
self.hass.block_till_done() self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor') state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'off' assert state.state == 'off'
def test_max_samples(self):
"""Test that sample count is limited correctly."""
assert setup.setup_component(self.hass, 'binary_sensor', {
'binary_sensor': {
'platform': 'trend',
'sensors': {
'test_trend_sensor': {
'entity_id': "sensor.test_state",
'max_samples': 3,
'min_gradient': -1,
}
}
}
})
for val in [0, 1, 2, 3, 2, 1]:
self.hass.states.set('sensor.test_state', val)
self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'on'
assert state.attributes['sample_count'] == 3
def test_non_numeric(self): def test_non_numeric(self):
"""Test up trend.""" """Test up trend."""
assert setup.setup_component(self.hass, 'binary_sensor', { assert setup.setup_component(self.hass, 'binary_sensor', {
@ -186,7 +269,6 @@ class TestTrendBinarySensor:
self.hass.states.set('sensor.test_state', 'State', {'attr': '2'}) self.hass.states.set('sensor.test_state', 'State', {'attr': '2'})
self.hass.block_till_done() self.hass.block_till_done()
self.hass.states.set('sensor.test_state', 'State', {'attr': '1'}) self.hass.states.set('sensor.test_state', 'State', {'attr': '1'})
self.hass.block_till_done() self.hass.block_till_done()
state = self.hass.states.get('binary_sensor.test_trend_sensor') state = self.hass.states.get('binary_sensor.test_trend_sensor')
assert state.state == 'off' assert state.state == 'off'