diff --git a/homeassistant/components/sensor/mqtt.py b/homeassistant/components/sensor/mqtt.py index 225ed07a622..68f49961cf9 100644 --- a/homeassistant/components/sensor/mqtt.py +++ b/homeassistant/components/sensor/mqtt.py @@ -16,7 +16,7 @@ from homeassistant.components import sensor from homeassistant.components.mqtt import ( ATTR_DISCOVERY_HASH, CONF_AVAILABILITY_TOPIC, CONF_STATE_TOPIC, CONF_PAYLOAD_AVAILABLE, CONF_PAYLOAD_NOT_AVAILABLE, CONF_QOS, - MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo) + MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo, subscription) from homeassistant.components.mqtt.discovery import MQTT_DISCOVERY_NEW from homeassistant.components.sensor import DEVICE_CLASSES_SCHEMA from homeassistant.const import ( @@ -58,7 +58,7 @@ PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({ async def async_setup_platform(hass: HomeAssistantType, config: ConfigType, async_add_entities, discovery_info=None): """Set up MQTT sensors through configuration.yaml.""" - await _async_setup_entity(hass, config, async_add_entities) + await _async_setup_entity(config, async_add_entities) async def async_setup_entry(hass, config_entry, async_add_entities): @@ -66,7 +66,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities): async def async_discover_sensor(discovery_payload): """Discover and add a discovered MQTT sensor.""" config = PLATFORM_SCHEMA(discovery_payload) - await _async_setup_entity(hass, config, async_add_entities, + await _async_setup_entity(config, async_add_entities, discovery_payload[ATTR_DISCOVERY_HASH]) async_dispatcher_connect(hass, @@ -74,67 +74,81 @@ async def async_setup_entry(hass, config_entry, async_add_entities): async_discover_sensor) -async def _async_setup_entity(hass: HomeAssistantType, config: ConfigType, - async_add_entities, discovery_hash=None): +async def _async_setup_entity(config: ConfigType, async_add_entities, + discovery_hash=None): """Set up MQTT sensor.""" - value_template = config.get(CONF_VALUE_TEMPLATE) - if value_template is not None: - value_template.hass = hass - - async_add_entities([MqttSensor( - config.get(CONF_NAME), - config.get(CONF_STATE_TOPIC), - config.get(CONF_QOS), - config.get(CONF_UNIT_OF_MEASUREMENT), - config.get(CONF_FORCE_UPDATE), - config.get(CONF_EXPIRE_AFTER), - config.get(CONF_ICON), - config.get(CONF_DEVICE_CLASS), - value_template, - config.get(CONF_JSON_ATTRS), - config.get(CONF_UNIQUE_ID), - config.get(CONF_AVAILABILITY_TOPIC), - config.get(CONF_PAYLOAD_AVAILABLE), - config.get(CONF_PAYLOAD_NOT_AVAILABLE), - config.get(CONF_DEVICE), - discovery_hash, - )]) + async_add_entities([MqttSensor(config, discovery_hash)]) class MqttSensor(MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo, Entity): """Representation of a sensor that can be updated using MQTT.""" - def __init__(self, name, state_topic, qos, unit_of_measurement, - force_update, expire_after, icon, device_class: Optional[str], - value_template, json_attributes, unique_id: Optional[str], - availability_topic, payload_available, payload_not_available, - device_config: Optional[ConfigType], discovery_hash): + def __init__(self, config, discovery_hash): """Initialize the sensor.""" - MqttAvailability.__init__(self, availability_topic, qos, - payload_available, payload_not_available) - MqttDiscoveryUpdate.__init__(self, discovery_hash) - MqttEntityDeviceInfo.__init__(self, device_config) self._state = STATE_UNKNOWN - self._name = name - self._state_topic = state_topic - self._qos = qos - self._unit_of_measurement = unit_of_measurement - self._force_update = force_update - self._template = value_template - self._expire_after = expire_after - self._icon = icon - self._device_class = device_class + self._sub_state = None self._expiration_trigger = None - self._json_attributes = set(json_attributes) - self._unique_id = unique_id self._attributes = None - self._discovery_hash = discovery_hash + + self._name = None + self._state_topic = None + self._qos = None + self._unit_of_measurement = None + self._force_update = None + self._template = None + self._expire_after = None + self._icon = None + self._device_class = None + self._json_attributes = None + self._unique_id = None + + # Load config + self._setup_from_config(config) + + availability_topic = config.get(CONF_AVAILABILITY_TOPIC) + payload_available = config.get(CONF_PAYLOAD_AVAILABLE) + payload_not_available = config.get(CONF_PAYLOAD_NOT_AVAILABLE) + device_config = config.get(CONF_DEVICE) + + MqttAvailability.__init__(self, availability_topic, self._qos, + payload_available, payload_not_available) + MqttDiscoveryUpdate.__init__(self, discovery_hash, + self.discovery_update) + MqttEntityDeviceInfo.__init__(self, device_config) async def async_added_to_hass(self): """Subscribe to MQTT events.""" await MqttAvailability.async_added_to_hass(self) await MqttDiscoveryUpdate.async_added_to_hass(self) + await self._subscribe_topics() + + async def discovery_update(self, discovery_payload): + """Handle updated discovery message.""" + config = PLATFORM_SCHEMA(discovery_payload) + self._setup_from_config(config) + await self.availability_discovery_update(config) + await self._subscribe_topics() + self.async_schedule_update_ha_state() + + def _setup_from_config(self, config): + """(Re)Setup the entity.""" + self._name = config.get(CONF_NAME) + self._state_topic = config.get(CONF_STATE_TOPIC) + self._qos = config.get(CONF_QOS) + self._unit_of_measurement = config.get(CONF_UNIT_OF_MEASUREMENT) + self._force_update = config.get(CONF_FORCE_UPDATE) + self._expire_after = config.get(CONF_EXPIRE_AFTER) + self._icon = config.get(CONF_ICON) + self._device_class = config.get(CONF_DEVICE_CLASS) + self._template = config.get(CONF_VALUE_TEMPLATE) + self._json_attributes = set(config.get(CONF_JSON_ATTRS)) + self._unique_id = config.get(CONF_UNIQUE_ID) + + async def _subscribe_topics(self): + """(Re)Subscribe to topics.""" + if self._template is not None: + self._template.hass = self.hass @callback def message_received(topic, payload, qos): @@ -173,8 +187,16 @@ class MqttSensor(MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo, self._state = payload self.async_schedule_update_ha_state() - await mqtt.async_subscribe(self.hass, self._state_topic, - message_received, self._qos) + self._sub_state = await subscription.async_subscribe_topics( + self.hass, self._sub_state, + {'state_topic': {'topic': self._state_topic, + 'msg_callback': message_received, + 'qos': self._qos}}) + + async def async_will_remove_from_hass(self): + """Unsubscribe when removed.""" + await subscription.async_unsubscribe_topics(self.hass, self._sub_state) + await MqttAvailability.async_will_remove_from_hass(self) @callback def value_is_expired(self, *_): diff --git a/tests/components/sensor/test_mqtt.py b/tests/components/sensor/test_mqtt.py index 15042805a66..78de05e1ff3 100644 --- a/tests/components/sensor/test_mqtt.py +++ b/tests/components/sensor/test_mqtt.py @@ -412,6 +412,39 @@ async def test_discovery_removal_sensor(hass, mqtt_mock, caplog): assert state is None +async def test_discovery_update_sensor(hass, mqtt_mock, caplog): + """Test removal of discovered sensor.""" + entry = MockConfigEntry(domain=mqtt.DOMAIN) + await async_start(hass, 'homeassistant', {}, entry) + data1 = ( + '{ "name": "Beer",' + ' "status_topic": "test_topic" }' + ) + data2 = ( + '{ "name": "Milk",' + ' "status_topic": "test_topic" }' + ) + async_fire_mqtt_message(hass, 'homeassistant/sensor/bla/config', + data1) + await hass.async_block_till_done() + + state = hass.states.get('sensor.beer') + assert state is not None + assert state.name == 'Beer' + + async_fire_mqtt_message(hass, 'homeassistant/sensor/bla/config', + data2) + await hass.async_block_till_done() + await hass.async_block_till_done() + + state = hass.states.get('sensor.beer') + assert state is not None + assert state.name == 'Milk' + + state = hass.states.get('sensor.milk') + assert state is None + + async def test_entity_device_info_with_identifier(hass, mqtt_mock): """Test MQTT sensor device registry integration.""" entry = MockConfigEntry(domain=mqtt.DOMAIN)