Add EntityFilter helper (#10221)

* Add EntityFilter helper

* Changes in entityfilter after code review

* Convert recorder to use EntityFilter

* Fix flake/lint errors in recorder

* Update entity filter helper to return function

* Update recorder to use updated entity filter

* Better docstrings in entityfilter

* Update entityfilter.py
This commit is contained in:
Matt White 2017-10-31 22:54:50 -06:00 committed by Paulus Schoutsen
parent b6324b511c
commit 9eaa057739
4 changed files with 195 additions and 21 deletions

View File

@ -20,12 +20,13 @@ from typing import Optional, Dict
import voluptuous as vol
from homeassistant.core import (
HomeAssistant, callback, split_entity_id, CoreState)
HomeAssistant, callback, CoreState)
from homeassistant.const import (
ATTR_ENTITY_ID, CONF_ENTITIES, CONF_EXCLUDE, CONF_DOMAINS,
CONF_INCLUDE, EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START,
EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, MATCH_ALL)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entityfilter import generate_filter
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType
import homeassistant.util.dt as dt_util
@ -178,10 +179,10 @@ class Recorder(threading.Thread):
self.engine = None # type: Any
self.run_info = None # type: Any
self.include_e = include.get(CONF_ENTITIES, [])
self.include_d = include.get(CONF_DOMAINS, [])
self.exclude = exclude.get(CONF_ENTITIES, []) + \
exclude.get(CONF_DOMAINS, [])
self.entity_filter = generate_filter(include.get(CONF_DOMAINS, []),
include.get(CONF_ENTITIES, []),
exclude.get(CONF_DOMAINS, []),
exclude.get(CONF_ENTITIES, []))
self.exclude_t = exclude.get(CONF_EVENT_TYPES, [])
self.get_session = None
@ -290,21 +291,7 @@ class Recorder(threading.Thread):
entity_id = event.data.get(ATTR_ENTITY_ID)
if entity_id is not None:
domain = split_entity_id(entity_id)[0]
# Exclude entities OR
# Exclude domains, but include specific entities
if (entity_id in self.exclude) or \
(domain in self.exclude and
entity_id not in self.include_e):
self.queue.task_done()
continue
# Included domains only (excluded entities above) OR
# Include entities only, but only if no excludes
if (self.include_d and domain not in self.include_d) or \
(self.include_e and entity_id not in self.include_e
and not self.exclude):
if not self.entity_filter(entity_id):
self.queue.task_done()
continue

View File

@ -12,7 +12,8 @@ import voluptuous as vol
from homeassistant.loader import get_platform
from homeassistant.const import (
CONF_PLATFORM, CONF_SCAN_INTERVAL, TEMP_CELSIUS, TEMP_FAHRENHEIT,
CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE, CONF_PLATFORM,
CONF_SCAN_INTERVAL, TEMP_CELSIUS, TEMP_FAHRENHEIT,
CONF_ALIAS, CONF_ENTITY_ID, CONF_VALUE_TEMPLATE, WEEKDAYS,
CONF_CONDITION, CONF_BELOW, CONF_ABOVE, CONF_TIMEOUT, SUN_EVENT_SUNSET,
SUN_EVENT_SUNRISE, CONF_UNIT_SYSTEM_IMPERIAL, CONF_UNIT_SYSTEM_METRIC)
@ -562,3 +563,16 @@ SCRIPT_SCHEMA = vol.All(
[vol.Any(SERVICE_SCHEMA, _SCRIPT_DELAY_SCHEMA,
_SCRIPT_WAIT_TEMPLATE_SCHEMA, EVENT_SCHEMA, CONDITION_SCHEMA)],
)
FILTER_SCHEMA = vol.Schema({
vol.Optional(CONF_EXCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(ensure_list, [string])
}),
vol.Optional(CONF_INCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(ensure_list, [string])
})
})

View File

@ -0,0 +1,78 @@
"""Helper class to implement include/exclude of entities and domains."""
from homeassistant.core import split_entity_id
def generate_filter(include_domains, include_entities,
exclude_domains, exclude_entities):
"""Return a function that will filter entities based on the args."""
include_d = set(include_domains)
include_e = set(include_entities)
exclude_d = set(exclude_domains)
exclude_e = set(exclude_entities)
have_exclude = bool(exclude_e or exclude_d)
have_include = bool(include_e or include_d)
# Case 1 - no includes or excludes - pass all entities
if not have_include and not have_exclude:
return lambda entity_id: True
# Case 2 - includes, no excludes - only include specified entities
if have_include and not have_exclude:
def entity_filter_2(entity_id):
"""Return filter function for case 2."""
domain = split_entity_id(entity_id)[0]
return (entity_id in include_e or
domain in include_d)
return entity_filter_2
# Case 3 - excludes, no includes - only exclude specified entities
if not have_include and have_exclude:
def entity_filter_3(entity_id):
"""Return filter function for case 3."""
domain = split_entity_id(entity_id)[0]
return (entity_id not in exclude_e and
domain not in exclude_d)
return entity_filter_3
# Case 4 - both includes and excludes specified
# Case 4a - include domain specified
# - if domain is included, and entity not excluded, pass
# - if domain is not included, and entity not included, fail
# note: if both include and exclude domains specified,
# the exclude domains are ignored
if include_d:
def entity_filter_4a(entity_id):
"""Return filter function for case 4a."""
domain = split_entity_id(entity_id)[0]
if domain in include_d:
return entity_id not in exclude_e
else:
return entity_id in include_e
return entity_filter_4a
# Case 4b - exclude domain specified
# - if domain is excluded, and entity not included, fail
# - if domain is not excluded, and entity not excluded, pass
if exclude_d:
def entity_filter_4b(entity_id):
"""Return filter function for case 4b."""
domain = split_entity_id(entity_id)[0]
if domain in exclude_d:
return entity_id in include_e
else:
return entity_id not in exclude_e
return entity_filter_4b
# Case 4c - neither include or exclude domain specified
# - Only pass if entity is included. Ignore entity excludes.
def entity_filter_4c(entity_id):
"""Return filter function for case 4c."""
return entity_id in include_e
return entity_filter_4c

View File

@ -0,0 +1,95 @@
"""The tests for the EntityFitler component."""
from homeassistant.helpers.entityfilter import generate_filter
def test_no_filters_case_1():
"""If include and exclude not included, pass everything."""
incl_dom = {}
incl_ent = {}
excl_dom = {}
excl_ent = {}
testfilter = generate_filter(incl_dom, incl_ent, excl_dom, excl_ent)
for value in ("sensor.test", "sun.sun", "light.test"):
assert testfilter(value)
def test_includes_only_case_2():
"""If include specified, only pass if specified (Case 2)."""
incl_dom = {'light', 'sensor'}
incl_ent = {'binary_sensor.working'}
excl_dom = {}
excl_ent = {}
testfilter = generate_filter(incl_dom, incl_ent, excl_dom, excl_ent)
assert testfilter("sensor.test")
assert testfilter("light.test")
assert testfilter("binary_sensor.working")
assert testfilter("binary_sensor.notworking") is False
assert testfilter("sun.sun") is False
def test_excludes_only_case_3():
"""If exclude specified, pass all but specified (Case 3)."""
incl_dom = {}
incl_ent = {}
excl_dom = {'light', 'sensor'}
excl_ent = {'binary_sensor.working'}
testfilter = generate_filter(incl_dom, incl_ent, excl_dom, excl_ent)
assert testfilter("sensor.test") is False
assert testfilter("light.test") is False
assert testfilter("binary_sensor.working") is False
assert testfilter("binary_sensor.another")
assert testfilter("sun.sun") is True
def test_with_include_domain_case4a():
"""Test case 4a - include and exclude specified, with included domain."""
incl_dom = {'light', 'sensor'}
incl_ent = {'binary_sensor.working'}
excl_dom = {}
excl_ent = {'light.ignoreme', 'sensor.notworking'}
testfilter = generate_filter(incl_dom, incl_ent, excl_dom, excl_ent)
assert testfilter("sensor.test")
assert testfilter("sensor.notworking") is False
assert testfilter("light.test")
assert testfilter("light.ignoreme") is False
assert testfilter("binary_sensor.working")
assert testfilter("binary_sensor.another") is False
assert testfilter("sun.sun") is False
def test_exclude_domain_case4b():
"""Test case 4b - include and exclude specified, with excluded domain."""
incl_dom = {}
incl_ent = {'binary_sensor.working'}
excl_dom = {'binary_sensor'}
excl_ent = {'light.ignoreme', 'sensor.notworking'}
testfilter = generate_filter(incl_dom, incl_ent, excl_dom, excl_ent)
assert testfilter("sensor.test")
assert testfilter("sensor.notworking") is False
assert testfilter("light.test")
assert testfilter("light.ignoreme") is False
assert testfilter("binary_sensor.working")
assert testfilter("binary_sensor.another") is False
assert testfilter("sun.sun") is True
def testno_domain_case4c():
"""Test case 4c - include and exclude specified, with no domains."""
incl_dom = {}
incl_ent = {'binary_sensor.working'}
excl_dom = {}
excl_ent = {'light.ignoreme', 'sensor.notworking'}
testfilter = generate_filter(incl_dom, incl_ent, excl_dom, excl_ent)
assert testfilter("sensor.test") is False
assert testfilter("sensor.notworking") is False
assert testfilter("light.test") is False
assert testfilter("light.ignoreme") is False
assert testfilter("binary_sensor.working")
assert testfilter("binary_sensor.another") is False
assert testfilter("sun.sun") is False