mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 21:27:38 +00:00
Rewrite influxdb metrics to be more consistent (#4791)
* Updated to make all metrics consistent * Updated existing test for new format * Updated checks on lists and dictionarys
This commit is contained in:
parent
ff0788324c
commit
4ef7e08553
@ -9,9 +9,8 @@ import logging
|
|||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
EVENT_STATE_CHANGED, STATE_UNAVAILABLE, STATE_UNKNOWN, CONF_HOST,
|
EVENT_STATE_CHANGED, CONF_HOST, CONF_PORT, CONF_SSL, CONF_VERIFY_SSL,
|
||||||
CONF_PORT, CONF_SSL, CONF_VERIFY_SSL, CONF_USERNAME, CONF_BLACKLIST,
|
CONF_USERNAME, CONF_BLACKLIST, CONF_PASSWORD, CONF_WHITELIST)
|
||||||
CONF_PASSWORD, CONF_WHITELIST)
|
|
||||||
from homeassistant.helpers import state as state_helper
|
from homeassistant.helpers import state as state_helper
|
||||||
import homeassistant.helpers.config_validation as cv
|
import homeassistant.helpers.config_validation as cv
|
||||||
|
|
||||||
@ -38,7 +37,6 @@ CONFIG_SCHEMA = vol.Schema({
|
|||||||
vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string,
|
vol.Optional(CONF_DB_NAME, default=DEFAULT_DATABASE): cv.string,
|
||||||
vol.Optional(CONF_PORT): cv.port,
|
vol.Optional(CONF_PORT): cv.port,
|
||||||
vol.Optional(CONF_SSL): cv.boolean,
|
vol.Optional(CONF_SSL): cv.boolean,
|
||||||
vol.Optional(CONF_DEFAULT_MEASUREMENT): cv.string,
|
|
||||||
vol.Optional(CONF_TAGS, default={}):
|
vol.Optional(CONF_TAGS, default={}):
|
||||||
vol.Schema({cv.string: cv.string}),
|
vol.Schema({cv.string: cv.string}),
|
||||||
vol.Optional(CONF_WHITELIST, default=[]):
|
vol.Optional(CONF_WHITELIST, default=[]):
|
||||||
@ -78,7 +76,6 @@ def setup(hass, config):
|
|||||||
blacklist = conf.get(CONF_BLACKLIST)
|
blacklist = conf.get(CONF_BLACKLIST)
|
||||||
whitelist = conf.get(CONF_WHITELIST)
|
whitelist = conf.get(CONF_WHITELIST)
|
||||||
tags = conf.get(CONF_TAGS)
|
tags = conf.get(CONF_TAGS)
|
||||||
default_measurement = conf.get(CONF_DEFAULT_MEASUREMENT)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
influx = InfluxDBClient(**kwargs)
|
influx = InfluxDBClient(**kwargs)
|
||||||
@ -92,51 +89,57 @@ def setup(hass, config):
|
|||||||
def influx_event_listener(event):
|
def influx_event_listener(event):
|
||||||
"""Listen for new messages on the bus and sends them to Influx."""
|
"""Listen for new messages on the bus and sends them to Influx."""
|
||||||
state = event.data.get('new_state')
|
state = event.data.get('new_state')
|
||||||
if state is None or state.state in (
|
if state is None or state.entity_id in blacklist:
|
||||||
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
|
return
|
||||||
state.entity_id in blacklist:
|
|
||||||
|
if whitelist and state.entity_id not in whitelist:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if len(whitelist) > 0 and state.entity_id not in whitelist:
|
|
||||||
return
|
|
||||||
|
|
||||||
_state = state_helper.state_as_number(state)
|
_state = state_helper.state_as_number(state)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
_state = state.state
|
_state = state.state
|
||||||
|
|
||||||
measurement = state.attributes.get('unit_of_measurement')
|
# Create a counter for this state change
|
||||||
if measurement in (None, ''):
|
|
||||||
if default_measurement:
|
|
||||||
measurement = default_measurement
|
|
||||||
else:
|
|
||||||
measurement = state.entity_id
|
|
||||||
|
|
||||||
json_body = [
|
json_body = [
|
||||||
{
|
{
|
||||||
'measurement': measurement,
|
'measurement': "hass.state.count",
|
||||||
'tags': {
|
'tags': {
|
||||||
'domain': state.domain,
|
'domain': state.domain,
|
||||||
'entity_id': state.object_id,
|
'entity_id': state.object_id,
|
||||||
},
|
},
|
||||||
'time': event.time_fired,
|
'time': event.time_fired,
|
||||||
'fields': {
|
'fields': {
|
||||||
'value': _state,
|
'value': 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
for key, value in state.attributes.items():
|
|
||||||
if key != 'unit_of_measurement':
|
|
||||||
if isinstance(value, (str, float, bool)) or \
|
|
||||||
key.endswith('_id'):
|
|
||||||
json_body[0]['fields'][key] = value
|
|
||||||
elif isinstance(value, int):
|
|
||||||
# Prevent column data errors in influxDB.
|
|
||||||
json_body[0]['fields'][key] = float(value)
|
|
||||||
|
|
||||||
json_body[0]['tags'].update(tags)
|
json_body[0]['tags'].update(tags)
|
||||||
|
|
||||||
|
state_fields = {}
|
||||||
|
if isinstance(_state, (int, float)):
|
||||||
|
state_fields['value'] = float(_state)
|
||||||
|
|
||||||
|
for key, value in state.attributes.items():
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
state_fields[key] = float(value)
|
||||||
|
|
||||||
|
if state_fields:
|
||||||
|
json_body.append(
|
||||||
|
{
|
||||||
|
'measurement': "hass.state",
|
||||||
|
'tags': {
|
||||||
|
'domain': state.domain,
|
||||||
|
'entity_id': state.object_id
|
||||||
|
},
|
||||||
|
'time': event.time_fired,
|
||||||
|
'fields': state_fields
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
json_body[1]['tags'].update(tags)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
influx.write_points(json_body)
|
influx.write_points(json_body)
|
||||||
except exceptions.InfluxDBClientError:
|
except exceptions.InfluxDBClientError:
|
||||||
|
@ -106,36 +106,43 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
"""Test the event listener."""
|
"""Test the event listener."""
|
||||||
self._setup()
|
self._setup()
|
||||||
|
|
||||||
valid = {
|
valid = {'1': 1, '1.0': 1.0, STATE_ON: 1, STATE_OFF: 0, 'str': 'str'}
|
||||||
'1': 1,
|
|
||||||
'1.0': 1.0,
|
|
||||||
STATE_ON: 1,
|
|
||||||
STATE_OFF: 0,
|
|
||||||
'foo': 'foo'
|
|
||||||
}
|
|
||||||
for in_, out in valid.items():
|
for in_, out in valid.items():
|
||||||
attrs = {
|
state = mock.MagicMock(state=in_, domain='fake',
|
||||||
'unit_of_measurement': 'foobars',
|
object_id='entity')
|
||||||
'longitude': '1.1',
|
|
||||||
'latitude': '2.2'
|
|
||||||
}
|
|
||||||
state = mock.MagicMock(
|
|
||||||
state=in_, domain='fake', object_id='entity', attributes=attrs)
|
|
||||||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
||||||
body = [{
|
|
||||||
'measurement': 'foobars',
|
body = [
|
||||||
'tags': {
|
{
|
||||||
'domain': 'fake',
|
'measurement': 'hass.state.count',
|
||||||
'entity_id': 'entity',
|
'tags': {
|
||||||
},
|
'domain': 'fake',
|
||||||
'time': 12345,
|
'entity_id': 'entity',
|
||||||
'fields': {
|
},
|
||||||
'value': out,
|
'time': 12345,
|
||||||
'longitude': '1.1',
|
'fields': {
|
||||||
'latitude': '2.2'
|
'value': 1,
|
||||||
},
|
}
|
||||||
}]
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
if isinstance(out, (int, float)):
|
||||||
|
body.append(
|
||||||
|
{
|
||||||
|
'measurement': 'hass.state',
|
||||||
|
'tags': {
|
||||||
|
'domain': 'fake',
|
||||||
|
'entity_id': 'entity',
|
||||||
|
},
|
||||||
|
'time': 12345,
|
||||||
|
'fields': {
|
||||||
|
'value': float(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
self.handler_method(event)
|
self.handler_method(event)
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
mock_client.return_value.write_points.call_count, 1
|
mock_client.return_value.write_points.call_count, 1
|
||||||
)
|
)
|
||||||
@ -143,40 +150,7 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
mock_client.return_value.write_points.call_args,
|
mock_client.return_value.write_points.call_args,
|
||||||
mock.call(body)
|
mock.call(body)
|
||||||
)
|
)
|
||||||
mock_client.return_value.write_points.reset_mock()
|
|
||||||
|
|
||||||
def test_event_listener_no_units(self, mock_client):
|
|
||||||
"""Test the event listener for missing units."""
|
|
||||||
self._setup()
|
|
||||||
|
|
||||||
for unit in (None, ''):
|
|
||||||
if unit:
|
|
||||||
attrs = {'unit_of_measurement': unit}
|
|
||||||
else:
|
|
||||||
attrs = {}
|
|
||||||
state = mock.MagicMock(
|
|
||||||
state=1, domain='fake', entity_id='entity-id',
|
|
||||||
object_id='entity', attributes=attrs)
|
|
||||||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
|
||||||
body = [{
|
|
||||||
'measurement': 'entity-id',
|
|
||||||
'tags': {
|
|
||||||
'domain': 'fake',
|
|
||||||
'entity_id': 'entity',
|
|
||||||
},
|
|
||||||
'time': 12345,
|
|
||||||
'fields': {
|
|
||||||
'value': 1,
|
|
||||||
},
|
|
||||||
}]
|
|
||||||
self.handler_method(event)
|
|
||||||
self.assertEqual(
|
|
||||||
mock_client.return_value.write_points.call_count, 1
|
|
||||||
)
|
|
||||||
self.assertEqual(
|
|
||||||
mock_client.return_value.write_points.call_args,
|
|
||||||
mock.call(body)
|
|
||||||
)
|
|
||||||
mock_client.return_value.write_points.reset_mock()
|
mock_client.return_value.write_points.reset_mock()
|
||||||
|
|
||||||
def test_event_listener_fail_write(self, mock_client):
|
def test_event_listener_fail_write(self, mock_client):
|
||||||
@ -191,39 +165,6 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
influx_client.exceptions.InfluxDBClientError('foo')
|
influx_client.exceptions.InfluxDBClientError('foo')
|
||||||
self.handler_method(event)
|
self.handler_method(event)
|
||||||
|
|
||||||
def test_event_listener_states(self, mock_client):
|
|
||||||
"""Test the event listener against ignored states."""
|
|
||||||
self._setup()
|
|
||||||
|
|
||||||
for state_state in (1, 'unknown', '', 'unavailable'):
|
|
||||||
state = mock.MagicMock(
|
|
||||||
state=state_state, domain='fake', entity_id='entity-id',
|
|
||||||
object_id='entity', attributes={})
|
|
||||||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
|
||||||
body = [{
|
|
||||||
'measurement': 'entity-id',
|
|
||||||
'tags': {
|
|
||||||
'domain': 'fake',
|
|
||||||
'entity_id': 'entity',
|
|
||||||
},
|
|
||||||
'time': 12345,
|
|
||||||
'fields': {
|
|
||||||
'value': 1,
|
|
||||||
},
|
|
||||||
}]
|
|
||||||
self.handler_method(event)
|
|
||||||
if state_state == 1:
|
|
||||||
self.assertEqual(
|
|
||||||
mock_client.return_value.write_points.call_count, 1
|
|
||||||
)
|
|
||||||
self.assertEqual(
|
|
||||||
mock_client.return_value.write_points.call_args,
|
|
||||||
mock.call(body)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.assertFalse(mock_client.return_value.write_points.called)
|
|
||||||
mock_client.return_value.write_points.reset_mock()
|
|
||||||
|
|
||||||
def test_event_listener_blacklist(self, mock_client):
|
def test_event_listener_blacklist(self, mock_client):
|
||||||
"""Test the event listener against a blacklist."""
|
"""Test the event listener against a blacklist."""
|
||||||
self._setup()
|
self._setup()
|
||||||
@ -233,18 +174,34 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
state=1, domain='fake', entity_id='fake.{}'.format(entity_id),
|
state=1, domain='fake', entity_id='fake.{}'.format(entity_id),
|
||||||
object_id=entity_id, attributes={})
|
object_id=entity_id, attributes={})
|
||||||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
||||||
body = [{
|
|
||||||
'measurement': 'fake.{}'.format(entity_id),
|
body = [
|
||||||
'tags': {
|
{
|
||||||
'domain': 'fake',
|
'measurement': 'hass.state.count',
|
||||||
'entity_id': entity_id,
|
'tags': {
|
||||||
|
'domain': 'fake',
|
||||||
|
'entity_id': entity_id,
|
||||||
|
},
|
||||||
|
'time': 12345,
|
||||||
|
'fields': {
|
||||||
|
'value': 1,
|
||||||
|
}
|
||||||
},
|
},
|
||||||
'time': 12345,
|
{
|
||||||
'fields': {
|
'measurement': 'hass.state',
|
||||||
'value': 1,
|
'tags': {
|
||||||
},
|
'domain': 'fake',
|
||||||
}]
|
'entity_id': entity_id,
|
||||||
|
},
|
||||||
|
'time': 12345,
|
||||||
|
'fields': {
|
||||||
|
'value': 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
self.handler_method(event)
|
self.handler_method(event)
|
||||||
|
|
||||||
if entity_id == 'ok':
|
if entity_id == 'ok':
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
mock_client.return_value.write_points.call_count, 1
|
mock_client.return_value.write_points.call_count, 1
|
||||||
@ -255,6 +212,7 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self.assertFalse(mock_client.return_value.write_points.called)
|
self.assertFalse(mock_client.return_value.write_points.called)
|
||||||
|
|
||||||
mock_client.return_value.write_points.reset_mock()
|
mock_client.return_value.write_points.reset_mock()
|
||||||
|
|
||||||
def test_event_listener_invalid_type(self, mock_client):
|
def test_event_listener_invalid_type(self, mock_client):
|
||||||
@ -271,27 +229,43 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
for in_, out in valid.items():
|
for in_, out in valid.items():
|
||||||
attrs = {
|
attrs = {
|
||||||
'unit_of_measurement': 'foobars',
|
'unit_of_measurement': 'foobars',
|
||||||
'longitude': '1.1',
|
|
||||||
'latitude': '2.2',
|
|
||||||
'invalid_attribute': ['value1', 'value2']
|
'invalid_attribute': ['value1', 'value2']
|
||||||
}
|
}
|
||||||
state = mock.MagicMock(
|
state = mock.MagicMock(
|
||||||
state=in_, domain='fake', object_id='entity', attributes=attrs)
|
state=in_, domain='fake', object_id='entity', attributes=attrs)
|
||||||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
||||||
body = [{
|
|
||||||
'measurement': 'foobars',
|
body = [
|
||||||
'tags': {
|
{
|
||||||
'domain': 'fake',
|
'measurement': 'hass.state.count',
|
||||||
'entity_id': 'entity',
|
'tags': {
|
||||||
},
|
'domain': 'fake',
|
||||||
'time': 12345,
|
'entity_id': 'entity',
|
||||||
'fields': {
|
},
|
||||||
'value': out,
|
'time': 12345,
|
||||||
'longitude': '1.1',
|
'fields': {
|
||||||
'latitude': '2.2'
|
'value': 1,
|
||||||
},
|
}
|
||||||
}]
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
if isinstance(out, (int, float)):
|
||||||
|
body.append(
|
||||||
|
{
|
||||||
|
'measurement': 'hass.state',
|
||||||
|
'tags': {
|
||||||
|
'domain': 'fake',
|
||||||
|
'entity_id': 'entity',
|
||||||
|
},
|
||||||
|
'time': 12345,
|
||||||
|
'fields': {
|
||||||
|
'value': float(out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
self.handler_method(event)
|
self.handler_method(event)
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
mock_client.return_value.write_points.call_count, 1
|
mock_client.return_value.write_points.call_count, 1
|
||||||
)
|
)
|
||||||
@ -299,47 +273,5 @@ class TestInfluxDB(unittest.TestCase):
|
|||||||
mock_client.return_value.write_points.call_args,
|
mock_client.return_value.write_points.call_args,
|
||||||
mock.call(body)
|
mock.call(body)
|
||||||
)
|
)
|
||||||
mock_client.return_value.write_points.reset_mock()
|
|
||||||
|
|
||||||
def test_event_listener_default_measurement(self, mock_client):
|
|
||||||
"""Test the event listener with a default measurement."""
|
|
||||||
config = {
|
|
||||||
'influxdb': {
|
|
||||||
'host': 'host',
|
|
||||||
'username': 'user',
|
|
||||||
'password': 'pass',
|
|
||||||
'default_measurement': 'state',
|
|
||||||
'blacklist': ['fake.blacklisted']
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert setup_component(self.hass, influxdb.DOMAIN, config)
|
|
||||||
self.handler_method = self.hass.bus.listen.call_args_list[0][0][1]
|
|
||||||
|
|
||||||
for entity_id in ('ok', 'blacklisted'):
|
|
||||||
state = mock.MagicMock(
|
|
||||||
state=1, domain='fake', entity_id='fake.{}'.format(entity_id),
|
|
||||||
object_id=entity_id, attributes={})
|
|
||||||
event = mock.MagicMock(data={'new_state': state}, time_fired=12345)
|
|
||||||
body = [{
|
|
||||||
'measurement': 'state',
|
|
||||||
'tags': {
|
|
||||||
'domain': 'fake',
|
|
||||||
'entity_id': entity_id,
|
|
||||||
},
|
|
||||||
'time': 12345,
|
|
||||||
'fields': {
|
|
||||||
'value': 1,
|
|
||||||
},
|
|
||||||
}]
|
|
||||||
self.handler_method(event)
|
|
||||||
if entity_id == 'ok':
|
|
||||||
self.assertEqual(
|
|
||||||
mock_client.return_value.write_points.call_count, 1
|
|
||||||
)
|
|
||||||
self.assertEqual(
|
|
||||||
mock_client.return_value.write_points.call_args,
|
|
||||||
mock.call(body)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.assertFalse(mock_client.return_value.write_points.called)
|
|
||||||
mock_client.return_value.write_points.reset_mock()
|
mock_client.return_value.write_points.reset_mock()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user