mirror of
https://github.com/home-assistant/core.git
synced 2025-07-15 17:27:10 +00:00
Merge pull request #13390 from nielstron/filter-band-pass
Adds a range filter to the Filter Sensor
This commit is contained in:
commit
66e33c7979
@ -28,6 +28,7 @@ import homeassistant.util.dt as dt_util
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
FILTER_NAME_RANGE = 'range'
|
||||
FILTER_NAME_LOWPASS = 'lowpass'
|
||||
FILTER_NAME_OUTLIER = 'outlier'
|
||||
FILTER_NAME_THROTTLE = 'throttle'
|
||||
@ -40,6 +41,8 @@ CONF_FILTER_WINDOW_SIZE = 'window_size'
|
||||
CONF_FILTER_PRECISION = 'precision'
|
||||
CONF_FILTER_RADIUS = 'radius'
|
||||
CONF_FILTER_TIME_CONSTANT = 'time_constant'
|
||||
CONF_FILTER_LOWER_BOUND = 'lower_bound'
|
||||
CONF_FILTER_UPPER_BOUND = 'upper_bound'
|
||||
CONF_TIME_SMA_TYPE = 'type'
|
||||
|
||||
TIME_SMA_LAST = 'last'
|
||||
@ -77,6 +80,12 @@ FILTER_LOWPASS_SCHEMA = FILTER_SCHEMA.extend({
|
||||
default=DEFAULT_FILTER_TIME_CONSTANT): vol.Coerce(int),
|
||||
})
|
||||
|
||||
FILTER_RANGE_SCHEMA = FILTER_SCHEMA.extend({
|
||||
vol.Required(CONF_FILTER_NAME): FILTER_NAME_RANGE,
|
||||
vol.Optional(CONF_FILTER_LOWER_BOUND): vol.Coerce(float),
|
||||
vol.Optional(CONF_FILTER_UPPER_BOUND): vol.Coerce(float),
|
||||
})
|
||||
|
||||
FILTER_TIME_SMA_SCHEMA = FILTER_SCHEMA.extend({
|
||||
vol.Required(CONF_FILTER_NAME): FILTER_NAME_TIME_SMA,
|
||||
vol.Optional(CONF_TIME_SMA_TYPE,
|
||||
@ -100,7 +109,8 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
||||
[vol.Any(FILTER_OUTLIER_SCHEMA,
|
||||
FILTER_LOWPASS_SCHEMA,
|
||||
FILTER_TIME_SMA_SCHEMA,
|
||||
FILTER_THROTTLE_SCHEMA)])
|
||||
FILTER_THROTTLE_SCHEMA,
|
||||
FILTER_RANGE_SCHEMA)])
|
||||
})
|
||||
|
||||
|
||||
@ -325,6 +335,49 @@ class Filter(object):
|
||||
return new_state
|
||||
|
||||
|
||||
@FILTERS.register(FILTER_NAME_RANGE)
|
||||
class RangeFilter(Filter):
|
||||
"""Range filter.
|
||||
|
||||
Determines if new state is in the range of upper_bound and lower_bound.
|
||||
If not inside, lower or upper bound is returned instead.
|
||||
|
||||
Args:
|
||||
upper_bound (float): band upper bound
|
||||
lower_bound (float): band lower bound
|
||||
"""
|
||||
|
||||
def __init__(self, entity,
|
||||
lower_bound, upper_bound):
|
||||
"""Initialize Filter."""
|
||||
super().__init__(FILTER_NAME_RANGE, entity=entity)
|
||||
self._lower_bound = lower_bound
|
||||
self._upper_bound = upper_bound
|
||||
self._stats_internal = Counter()
|
||||
|
||||
def _filter_state(self, new_state):
|
||||
"""Implement the range filter."""
|
||||
if self._upper_bound and new_state.state > self._upper_bound:
|
||||
|
||||
self._stats_internal['erasures_up'] += 1
|
||||
|
||||
_LOGGER.debug("Upper outlier nr. %s in %s: %s",
|
||||
self._stats_internal['erasures_up'],
|
||||
self._entity, new_state)
|
||||
new_state.state = self._upper_bound
|
||||
|
||||
elif self._lower_bound and new_state.state < self._lower_bound:
|
||||
|
||||
self._stats_internal['erasures_low'] += 1
|
||||
|
||||
_LOGGER.debug("Lower outlier nr. %s in %s: %s",
|
||||
self._stats_internal['erasures_low'],
|
||||
self._entity, new_state)
|
||||
new_state.state = self._lower_bound
|
||||
|
||||
return new_state
|
||||
|
||||
|
||||
@FILTERS.register(FILTER_NAME_OUTLIER)
|
||||
class OutlierFilter(Filter):
|
||||
"""BASIC outlier filter.
|
||||
|
@ -4,7 +4,8 @@ import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from homeassistant.components.sensor.filter import (
|
||||
LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter)
|
||||
LowPassFilter, OutlierFilter, ThrottleFilter, TimeSMAFilter,
|
||||
RangeFilter)
|
||||
import homeassistant.util.dt as dt_util
|
||||
from homeassistant.setup import setup_component
|
||||
import homeassistant.core as ha
|
||||
@ -131,6 +132,23 @@ class TestFilterSensor(unittest.TestCase):
|
||||
filtered = filt.filter_state(state)
|
||||
self.assertEqual(18.05, filtered.state)
|
||||
|
||||
def test_range(self):
|
||||
"""Test if range filter works."""
|
||||
lower = 10
|
||||
upper = 20
|
||||
filt = RangeFilter(entity=None,
|
||||
lower_bound=lower,
|
||||
upper_bound=upper)
|
||||
for unf_state in self.values:
|
||||
unf = float(unf_state.state)
|
||||
filtered = filt.filter_state(unf_state)
|
||||
if unf < lower:
|
||||
self.assertEqual(lower, filtered.state)
|
||||
elif unf > upper:
|
||||
self.assertEqual(upper, filtered.state)
|
||||
else:
|
||||
self.assertEqual(unf, filtered.state)
|
||||
|
||||
def test_throttle(self):
|
||||
"""Test if lowpass filter works."""
|
||||
filt = ThrottleFilter(window_size=3,
|
||||
|
Loading…
x
Reference in New Issue
Block a user