Adding expire_after to mqtt sensor to expire outdated values (#6708)

* Adding expire_after to mqtt sensor to expire outdated values

* Extending test case

* mqtt: refactoring expire_after to use timed events instead of polling; lint

* refactor to reset unused trigger

* Fix: handler must be set to None after execution or removal to avoid warning

* Commenting out non-working test

* Fix lint

* Commit to trigger new build

* Commit to trigger new build

* Make testcase work

* Undo unnecessary change

* Remove default value, add extra check
This commit is contained in:
micw 2017-03-23 22:55:07 +01:00 committed by Adam Mills
parent 3acd926d29
commit 6c5989895a
2 changed files with 104 additions and 2 deletions

View File

@ -6,6 +6,7 @@ https://home-assistant.io/components/sensor.mqtt/
""" """
import asyncio import asyncio
import logging import logging
from datetime import timedelta
import voluptuous as vol import voluptuous as vol
@ -16,10 +17,13 @@ from homeassistant.const import (
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
import homeassistant.components.mqtt as mqtt import homeassistant.components.mqtt as mqtt
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
CONF_FORCE_UPDATE = 'force_update' CONF_FORCE_UPDATE = 'force_update'
CONF_EXPIRE_AFTER = 'expire_after'
DEFAULT_NAME = 'MQTT Sensor' DEFAULT_NAME = 'MQTT Sensor'
DEFAULT_FORCE_UPDATE = False DEFAULT_FORCE_UPDATE = False
@ -28,6 +32,7 @@ DEPENDENCIES = ['mqtt']
PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({ PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string, vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean, vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
}) })
@ -48,6 +53,7 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
config.get(CONF_QOS), config.get(CONF_QOS),
config.get(CONF_UNIT_OF_MEASUREMENT), config.get(CONF_UNIT_OF_MEASUREMENT),
config.get(CONF_FORCE_UPDATE), config.get(CONF_FORCE_UPDATE),
config.get(CONF_EXPIRE_AFTER),
value_template, value_template,
)]) )])
@ -56,7 +62,7 @@ class MqttSensor(Entity):
"""Representation of a sensor that can be updated using MQTT.""" """Representation of a sensor that can be updated using MQTT."""
def __init__(self, name, state_topic, qos, unit_of_measurement, def __init__(self, name, state_topic, qos, unit_of_measurement,
force_update, value_template): force_update, expire_after, value_template):
"""Initialize the sensor.""" """Initialize the sensor."""
self._state = STATE_UNKNOWN self._state = STATE_UNKNOWN
self._name = name self._name = name
@ -65,6 +71,8 @@ class MqttSensor(Entity):
self._unit_of_measurement = unit_of_measurement self._unit_of_measurement = unit_of_measurement
self._force_update = force_update self._force_update = force_update
self._template = value_template self._template = value_template
self._expire_after = expire_after
self._expiration_trigger = None
def async_added_to_hass(self): def async_added_to_hass(self):
"""Subscribe mqtt events. """Subscribe mqtt events.
@ -74,6 +82,22 @@ class MqttSensor(Entity):
@callback @callback
def message_received(topic, payload, qos): def message_received(topic, payload, qos):
"""A new MQTT message has been received.""" """A new MQTT message has been received."""
# auto-expire enabled?
if self._expire_after is not None and self._expire_after > 0:
# Reset old trigger
if self._expiration_trigger:
self._expiration_trigger()
self._expiration_trigger = None
# Set new trigger
expiration_at = (
dt_util.utcnow() + timedelta(seconds=self._expire_after))
self._expiration_trigger = async_track_point_in_utc_time(
self.hass,
self.value_is_expired,
expiration_at)
if self._template is not None: if self._template is not None:
payload = self._template.async_render_with_possible_json_value( payload = self._template.async_render_with_possible_json_value(
payload, self._state) payload, self._state)
@ -83,6 +107,13 @@ class MqttSensor(Entity):
return mqtt.async_subscribe( return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos) self.hass, self._state_topic, message_received, self._qos)
@callback
def value_is_expired(self, *_):
"""Triggered when value is expired."""
self._expiration_trigger = None
self._state = STATE_UNKNOWN
self.hass.async_add_job(self.async_update_ha_state())
@property @property
def should_poll(self): def should_poll(self):
"""No polling needed.""" """No polling needed."""

View File

@ -1,12 +1,16 @@
"""The tests for the MQTT sensor platform.""" """The tests for the MQTT sensor platform."""
import unittest import unittest
from datetime import timedelta, datetime
from unittest.mock import patch
import homeassistant.core as ha import homeassistant.core as ha
from homeassistant.setup import setup_component from homeassistant.setup import setup_component
import homeassistant.components.sensor as sensor import homeassistant.components.sensor as sensor
from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.const import EVENT_STATE_CHANGED
from tests.common import mock_mqtt_component, fire_mqtt_message import homeassistant.util.dt as dt_util
from tests.common import mock_mqtt_component, fire_mqtt_message
from tests.common import get_test_home_assistant, mock_component from tests.common import get_test_home_assistant, mock_component
@ -42,6 +46,69 @@ class TestSensorMQTT(unittest.TestCase):
self.assertEqual('fav unit', self.assertEqual('fav unit',
state.attributes.get('unit_of_measurement')) state.attributes.get('unit_of_measurement'))
@patch('homeassistant.core.dt_util.utcnow')
def test_setting_sensor_value_expires(self, mock_utcnow):
"""Test the expiration of the value."""
mock_component(self.hass, 'mqtt')
assert setup_component(self.hass, sensor.DOMAIN, {
sensor.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'test-topic',
'unit_of_measurement': 'fav unit',
'expire_after': '4',
'force_update': True
}
})
state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)
now = datetime(2017, 1, 1, 1, tzinfo=dt_util.UTC)
mock_utcnow.return_value = now
fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done()
# Value was set correctly.
state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)
# Time jump +3s
now = now + timedelta(seconds=3)
self._send_time_changed(now)
self.hass.block_till_done()
# Value is not yet expired
state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)
# Next message resets timer
mock_utcnow.return_value = now
fire_mqtt_message(self.hass, 'test-topic', '101')
self.hass.block_till_done()
# Value was updated correctly.
state = self.hass.states.get('sensor.test')
self.assertEqual('101', state.state)
# Time jump +3s
now = now + timedelta(seconds=3)
self._send_time_changed(now)
self.hass.block_till_done()
# Value is not yet expired
state = self.hass.states.get('sensor.test')
self.assertEqual('101', state.state)
# Time jump +2s
now = now + timedelta(seconds=2)
self._send_time_changed(now)
self.hass.block_till_done()
# Value is expired now
state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)
def test_setting_sensor_value_via_mqtt_json_message(self): def test_setting_sensor_value_via_mqtt_json_message(self):
"""Test the setting of the value via MQTT with JSON playload.""" """Test the setting of the value via MQTT with JSON playload."""
mock_component(self.hass, 'mqtt') mock_component(self.hass, 'mqtt')
@ -117,3 +184,7 @@ class TestSensorMQTT(unittest.TestCase):
fire_mqtt_message(self.hass, 'test-topic', '100') fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done() self.hass.block_till_done()
self.assertEqual(2, len(events)) self.assertEqual(2, len(events))
def _send_time_changed(self, now):
"""Send a time changed event."""
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})