Added Time based SMA to Filter Sensor (#13104)

* Added Time based SMA

* move "now" to _filter_state()

* Addressed comments

* fix long line

* type and name

* # pylint: disable=redefined-builtin

* added test
This commit is contained in:
Diogo Gomes 2018-03-18 15:58:52 +00:00 committed by Paulus Schoutsen
parent b45dad507a
commit 1e17b2fd63
2 changed files with 81 additions and 3 deletions

View File

@ -20,12 +20,14 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.util.decorator import Registry
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_state_change
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__)
FILTER_NAME_LOWPASS = 'lowpass'
FILTER_NAME_OUTLIER = 'outlier'
FILTER_NAME_THROTTLE = 'throttle'
FILTER_NAME_TIME_SMA = 'time_simple_moving_average'
FILTERS = Registry()
CONF_FILTERS = 'filters'
@ -34,6 +36,9 @@ CONF_FILTER_WINDOW_SIZE = 'window_size'
CONF_FILTER_PRECISION = 'precision'
CONF_FILTER_RADIUS = 'radius'
CONF_FILTER_TIME_CONSTANT = 'time_constant'
CONF_TIME_SMA_TYPE = 'type'
TIME_SMA_LAST = 'last'
DEFAULT_WINDOW_SIZE = 1
DEFAULT_PRECISION = 2
@ -44,24 +49,37 @@ NAME_TEMPLATE = "{} filter"
ICON = 'mdi:chart-line-variant'
FILTER_SCHEMA = vol.Schema({
vol.Optional(CONF_FILTER_WINDOW_SIZE,
default=DEFAULT_WINDOW_SIZE): vol.Coerce(int),
vol.Optional(CONF_FILTER_PRECISION,
default=DEFAULT_PRECISION): vol.Coerce(int),
})
# pylint: disable=redefined-builtin
FILTER_OUTLIER_SCHEMA = FILTER_SCHEMA.extend({
vol.Required(CONF_FILTER_NAME): FILTER_NAME_OUTLIER,
vol.Optional(CONF_FILTER_WINDOW_SIZE,
default=DEFAULT_WINDOW_SIZE): vol.Coerce(int),
vol.Optional(CONF_FILTER_RADIUS,
default=DEFAULT_FILTER_RADIUS): vol.Coerce(float),
})
FILTER_LOWPASS_SCHEMA = FILTER_SCHEMA.extend({
vol.Required(CONF_FILTER_NAME): FILTER_NAME_LOWPASS,
vol.Optional(CONF_FILTER_WINDOW_SIZE,
default=DEFAULT_WINDOW_SIZE): vol.Coerce(int),
vol.Optional(CONF_FILTER_TIME_CONSTANT,
default=DEFAULT_FILTER_TIME_CONSTANT): vol.Coerce(int),
})
FILTER_TIME_SMA_SCHEMA = FILTER_SCHEMA.extend({
vol.Required(CONF_FILTER_NAME): FILTER_NAME_TIME_SMA,
vol.Optional(CONF_TIME_SMA_TYPE,
default=TIME_SMA_LAST): vol.In(
[None, TIME_SMA_LAST]),
vol.Required(CONF_FILTER_WINDOW_SIZE): vol.All(cv.time_period,
cv.positive_timedelta)
})
FILTER_THROTTLE_SCHEMA = FILTER_SCHEMA.extend({
vol.Required(CONF_FILTER_NAME): FILTER_NAME_THROTTLE,
})
@ -72,6 +90,7 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_FILTERS): vol.All(cv.ensure_list,
[vol.Any(FILTER_OUTLIER_SCHEMA,
FILTER_LOWPASS_SCHEMA,
FILTER_TIME_SMA_SCHEMA,
FILTER_THROTTLE_SCHEMA)])
})
@ -277,6 +296,49 @@ class LowPassFilter(Filter):
return filtered
@FILTERS.register(FILTER_NAME_TIME_SMA)
class TimeSMAFilter(Filter):
"""Simple Moving Average (SMA) Filter.
The window_size is determined by time, and SMA is time weighted.
Args:
variant (enum): type of argorithm used to connect discrete values
"""
def __init__(self, window_size, precision, entity, type):
"""Initialize Filter."""
super().__init__(FILTER_NAME_TIME_SMA, 0, precision, entity)
self._time_window = int(window_size.total_seconds())
self.last_leak = None
self.queue = deque()
def _leak(self, now):
"""Remove timeouted elements."""
while self.queue:
timestamp, _ = self.queue[0]
if timestamp + self._time_window <= now:
self.last_leak = self.queue.popleft()
else:
return
def _filter_state(self, new_state):
now = int(dt_util.utcnow().timestamp())
self._leak(now)
self.queue.append((now, float(new_state)))
moving_sum = 0
start = now - self._time_window
_, prev_val = self.last_leak or (0, float(new_state))
for timestamp, val in self.queue:
moving_sum += (timestamp-start)*prev_val
start, prev_val = timestamp, val
moving_sum += (now-start)*prev_val
return moving_sum/self._time_window
@FILTERS.register(FILTER_NAME_THROTTLE)
class ThrottleFilter(Filter):
"""Throttle Filter.

View File

@ -1,8 +1,11 @@
"""The test for the data filter sensor platform."""
from datetime import timedelta
import unittest
from unittest.mock import patch
from homeassistant.components.sensor.filter import (
LowPassFilter, OutlierFilter, ThrottleFilter)
LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter)
import homeassistant.util.dt as dt_util
from homeassistant.setup import setup_component
from tests.common import get_test_home_assistant, assert_setup_component
@ -90,3 +93,16 @@ class TestFilterSensor(unittest.TestCase):
if not filt.skip_processing:
filtered.append(new_state)
self.assertEqual([20, 21], filtered)
def test_time_sma(self):
"""Test if time_sma filter works."""
filt = TimeSMAFilter(window_size=timedelta(minutes=2),
precision=2,
entity=None,
type='last')
past = dt_util.utcnow() - timedelta(minutes=5)
for state in self.values:
with patch('homeassistant.util.dt.utcnow', return_value=past):
filtered = filt.filter_state(state)
past += timedelta(minutes=1)
self.assertEqual(21.5, filtered)