Use _attr_ in MQTT climate (#81406)

* Use _attr_ in MQTT climate

* Follow up comment

* Do not change code
This commit is contained in:
Jan Bouwhuis 2022-11-03 10:58:37 +01:00 committed by GitHub
parent 328eda044a
commit 203c83b6f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -374,16 +374,6 @@ class MqttClimate(MqttEntity, ClimateEntity):
def __init__(self, hass, config, config_entry, discovery_data):
"""Initialize the climate device."""
self._action = None
self._aux = False
self._current_fan_mode = None
self._current_operation = None
self._current_swing_mode = None
self._current_temp = None
self._preset_mode = None
self._target_temp = None
self._target_temp_high = None
self._target_temp_low = None
self._topic = None
self._value_templates = None
self._command_templates = None
@ -399,36 +389,54 @@ class MqttClimate(MqttEntity, ClimateEntity):
def _setup_from_config(self, config):
"""(Re)Setup the entity."""
self._attr_hvac_modes = config[CONF_MODE_LIST]
self._attr_min_temp = config[CONF_TEMP_MIN]
self._attr_max_temp = config[CONF_TEMP_MAX]
self._attr_precision = config.get(CONF_PRECISION, super().precision)
self._attr_fan_modes = config[CONF_FAN_MODE_LIST]
self._attr_swing_modes = config[CONF_SWING_MODE_LIST]
self._attr_target_temperature_step = config[CONF_TEMP_STEP]
self._attr_temperature_unit = config.get(
CONF_TEMPERATURE_UNIT, self.hass.config.units.temperature_unit
)
self._topic = {key: config.get(key) for key in TOPIC_KEYS}
# set to None in non-optimistic mode
self._target_temp = (
self._current_fan_mode
) = self._current_operation = self._current_swing_mode = None
self._target_temp_low = None
self._target_temp_high = None
self._attr_target_temperature = None
self._attr_fan_mode = None
self._attr_hvac_mode = None
self._attr_swing_mode = None
self._attr_target_temperature_low = None
self._attr_target_temperature_high = None
if self._topic[CONF_TEMP_STATE_TOPIC] is None:
self._target_temp = config[CONF_TEMP_INITIAL]
self._attr_target_temperature = config[CONF_TEMP_INITIAL]
if self._topic[CONF_TEMP_LOW_STATE_TOPIC] is None:
self._target_temp_low = config[CONF_TEMP_INITIAL]
self._attr_target_temperature_low = config[CONF_TEMP_INITIAL]
if self._topic[CONF_TEMP_HIGH_STATE_TOPIC] is None:
self._target_temp_high = config[CONF_TEMP_INITIAL]
self._attr_target_temperature_high = config[CONF_TEMP_INITIAL]
if self._topic[CONF_FAN_MODE_STATE_TOPIC] is None:
self._current_fan_mode = FAN_LOW
self._attr_fan_mode = FAN_LOW
if self._topic[CONF_SWING_MODE_STATE_TOPIC] is None:
self._current_swing_mode = SWING_OFF
self._attr_swing_mode = SWING_OFF
if self._topic[CONF_MODE_STATE_TOPIC] is None:
self._current_operation = HVACMode.OFF
self._attr_hvac_mode = HVACMode.OFF
self._feature_preset_mode = CONF_PRESET_MODE_COMMAND_TOPIC in config
if self._feature_preset_mode:
self._preset_modes = config[CONF_PRESET_MODES_LIST]
presets = []
presets.extend(config[CONF_PRESET_MODES_LIST])
if presets:
presets.insert(0, PRESET_NONE)
self._attr_preset_modes = presets
self._attr_preset_mode = PRESET_NONE
else:
self._preset_modes = []
self._attr_preset_modes = []
self._optimistic_preset_mode = CONF_PRESET_MODE_STATE_TOPIC not in config
self._action = None
self._aux = False
self._attr_hvac_action = None
self._attr_is_aux_heat = False
value_templates = {}
for key in VALUE_TEMPLATE_KEYS:
@ -455,437 +463,7 @@ class MqttClimate(MqttEntity, ClimateEntity):
self._command_templates = command_templates
def _prepare_subscribe_topics(self): # noqa: C901
"""(Re)Subscribe to topics."""
topics = {}
qos = self._config[CONF_QOS]
def add_subscription(topics, topic, msg_callback):
if self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": msg_callback,
"qos": qos,
"encoding": self._config[CONF_ENCODING] or None,
}
def render_template(msg, template_name):
template = self._value_templates[template_name]
return template(msg.payload)
@callback
@log_messages(self.hass, self.entity_id)
def handle_action_received(msg):
"""Handle receiving action via MQTT."""
payload = render_template(msg, CONF_ACTION_TEMPLATE)
if not payload or payload == PAYLOAD_NONE:
_LOGGER.debug(
"Invalid %s action: %s, ignoring",
[e.value for e in HVACAction],
payload,
)
return
try:
self._action = HVACAction(payload)
except ValueError:
_LOGGER.warning(
"Invalid %s action: %s",
[e.value for e in HVACAction],
payload,
)
return
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
add_subscription(topics, CONF_ACTION_TOPIC, handle_action_received)
@callback
def handle_temperature_received(msg, template_name, attr):
"""Handle temperature coming via MQTT."""
payload = render_template(msg, template_name)
try:
setattr(self, attr, float(payload))
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
except ValueError:
_LOGGER.error("Could not parse temperature from %s", payload)
@callback
@log_messages(self.hass, self.entity_id)
def handle_current_temperature_received(msg):
"""Handle current temperature coming via MQTT."""
handle_temperature_received(
msg, CONF_CURRENT_TEMP_TEMPLATE, "_current_temp"
)
add_subscription(
topics, CONF_CURRENT_TEMP_TOPIC, handle_current_temperature_received
)
@callback
@log_messages(self.hass, self.entity_id)
def handle_target_temperature_received(msg):
"""Handle target temperature coming via MQTT."""
handle_temperature_received(msg, CONF_TEMP_STATE_TEMPLATE, "_target_temp")
add_subscription(
topics, CONF_TEMP_STATE_TOPIC, handle_target_temperature_received
)
@callback
@log_messages(self.hass, self.entity_id)
def handle_temperature_low_received(msg):
"""Handle target temperature low coming via MQTT."""
handle_temperature_received(
msg, CONF_TEMP_LOW_STATE_TEMPLATE, "_target_temp_low"
)
add_subscription(
topics, CONF_TEMP_LOW_STATE_TOPIC, handle_temperature_low_received
)
@callback
@log_messages(self.hass, self.entity_id)
def handle_temperature_high_received(msg):
"""Handle target temperature high coming via MQTT."""
handle_temperature_received(
msg, CONF_TEMP_HIGH_STATE_TEMPLATE, "_target_temp_high"
)
add_subscription(
topics, CONF_TEMP_HIGH_STATE_TOPIC, handle_temperature_high_received
)
@callback
def handle_mode_received(msg, template_name, attr, mode_list):
"""Handle receiving listed mode via MQTT."""
payload = render_template(msg, template_name)
if payload not in self._config[mode_list]:
_LOGGER.error("Invalid %s mode: %s", mode_list, payload)
else:
setattr(self, attr, payload)
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
@callback
@log_messages(self.hass, self.entity_id)
def handle_current_mode_received(msg):
"""Handle receiving mode via MQTT."""
handle_mode_received(
msg, CONF_MODE_STATE_TEMPLATE, "_current_operation", CONF_MODE_LIST
)
add_subscription(topics, CONF_MODE_STATE_TOPIC, handle_current_mode_received)
@callback
@log_messages(self.hass, self.entity_id)
def handle_fan_mode_received(msg):
"""Handle receiving fan mode via MQTT."""
handle_mode_received(
msg,
CONF_FAN_MODE_STATE_TEMPLATE,
"_current_fan_mode",
CONF_FAN_MODE_LIST,
)
add_subscription(topics, CONF_FAN_MODE_STATE_TOPIC, handle_fan_mode_received)
@callback
@log_messages(self.hass, self.entity_id)
def handle_swing_mode_received(msg):
"""Handle receiving swing mode via MQTT."""
handle_mode_received(
msg,
CONF_SWING_MODE_STATE_TEMPLATE,
"_current_swing_mode",
CONF_SWING_MODE_LIST,
)
add_subscription(
topics, CONF_SWING_MODE_STATE_TOPIC, handle_swing_mode_received
)
@callback
def handle_onoff_mode_received(msg, template_name, attr):
"""Handle receiving on/off mode via MQTT."""
payload = render_template(msg, template_name)
payload_on = self._config[CONF_PAYLOAD_ON]
payload_off = self._config[CONF_PAYLOAD_OFF]
if payload == "True":
payload = payload_on
elif payload == "False":
payload = payload_off
if payload == payload_on:
setattr(self, attr, True)
elif payload == payload_off:
setattr(self, attr, False)
else:
_LOGGER.error("Invalid %s mode: %s", attr, payload)
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
@callback
@log_messages(self.hass, self.entity_id)
def handle_aux_mode_received(msg):
"""Handle receiving aux mode via MQTT."""
handle_onoff_mode_received(msg, CONF_AUX_STATE_TEMPLATE, "_aux")
add_subscription(topics, CONF_AUX_STATE_TOPIC, handle_aux_mode_received)
@callback
@log_messages(self.hass, self.entity_id)
def handle_preset_mode_received(msg):
"""Handle receiving preset mode via MQTT."""
preset_mode = render_template(msg, CONF_PRESET_MODE_VALUE_TEMPLATE)
if preset_mode in [PRESET_NONE, PAYLOAD_NONE]:
self._preset_mode = None
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
return
if not preset_mode:
_LOGGER.debug("Ignoring empty preset_mode from '%s'", msg.topic)
return
if preset_mode not in self._preset_modes:
_LOGGER.warning(
"'%s' received on topic %s. '%s' is not a valid preset mode",
msg.payload,
msg.topic,
preset_mode,
)
else:
self._preset_mode = preset_mode
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
add_subscription(
topics, CONF_PRESET_MODE_STATE_TOPIC, handle_preset_mode_received
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
await subscription.async_subscribe_topics(self.hass, self._sub_state)
@property
def temperature_unit(self) -> str:
"""Return the unit of measurement."""
if unit := self._config.get(CONF_TEMPERATURE_UNIT):
return unit
return self.hass.config.units.temperature_unit
@property
def current_temperature(self) -> float | None:
"""Return the current temperature."""
return self._current_temp
@property
def target_temperature(self) -> float | None:
"""Return the temperature we try to reach."""
return self._target_temp
@property
def target_temperature_low(self) -> float | None:
"""Return the low target temperature we try to reach."""
return self._target_temp_low
@property
def target_temperature_high(self) -> float | None:
"""Return the high target temperature we try to reach."""
return self._target_temp_high
@property
def hvac_action(self) -> HVACAction | None:
"""Return the current running hvac operation if supported."""
return self._action
@property
def hvac_mode(self) -> HVACMode:
"""Return current operation ie. heat, cool, idle."""
return self._current_operation
@property
def hvac_modes(self) -> list[HVACMode]:
"""Return the list of available operation modes."""
return self._config[CONF_MODE_LIST]
@property
def target_temperature_step(self) -> float:
"""Return the supported step of target temperature."""
return self._config[CONF_TEMP_STEP]
@property
def preset_mode(self) -> str | None:
"""Return preset mode."""
if self._feature_preset_mode and self._preset_mode is not None:
return self._preset_mode
return PRESET_NONE
@property
def preset_modes(self) -> list[str]:
"""Return preset modes."""
presets = []
presets.extend(self._preset_modes)
if presets:
presets.insert(0, PRESET_NONE)
return presets
@property
def is_aux_heat(self) -> bool | None:
"""Return true if away mode is on."""
return self._aux
@property
def fan_mode(self) -> str | None:
"""Return the fan setting."""
return self._current_fan_mode
@property
def fan_modes(self) -> list[str]:
"""Return the list of available fan modes."""
return self._config[CONF_FAN_MODE_LIST]
async def _publish(self, topic, payload):
if self._topic[topic] is not None:
await self.async_publish(
self._topic[topic],
payload,
self._config[CONF_QOS],
self._config[CONF_RETAIN],
self._config[CONF_ENCODING],
)
async def _set_temperature(
self, temp, cmnd_topic, cmnd_template, state_topic, attr
):
if temp is not None:
if self._topic[state_topic] is None:
# optimistic mode
setattr(self, attr, temp)
payload = self._command_templates[cmnd_template](temp)
await self._publish(cmnd_topic, payload)
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperatures."""
if (operation_mode := kwargs.get(ATTR_HVAC_MODE)) is not None:
await self.async_set_hvac_mode(operation_mode)
await self._set_temperature(
kwargs.get(ATTR_TEMPERATURE),
CONF_TEMP_COMMAND_TOPIC,
CONF_TEMP_COMMAND_TEMPLATE,
CONF_TEMP_STATE_TOPIC,
"_target_temp",
)
await self._set_temperature(
kwargs.get(ATTR_TARGET_TEMP_LOW),
CONF_TEMP_LOW_COMMAND_TOPIC,
CONF_TEMP_LOW_COMMAND_TEMPLATE,
CONF_TEMP_LOW_STATE_TOPIC,
"_target_temp_low",
)
await self._set_temperature(
kwargs.get(ATTR_TARGET_TEMP_HIGH),
CONF_TEMP_HIGH_COMMAND_TOPIC,
CONF_TEMP_HIGH_COMMAND_TEMPLATE,
CONF_TEMP_HIGH_STATE_TOPIC,
"_target_temp_high",
)
self.async_write_ha_state()
async def async_set_swing_mode(self, swing_mode: str) -> None:
"""Set new swing mode."""
payload = self._command_templates[CONF_SWING_MODE_COMMAND_TEMPLATE](swing_mode)
await self._publish(CONF_SWING_MODE_COMMAND_TOPIC, payload)
if self._topic[CONF_SWING_MODE_STATE_TOPIC] is None:
self._current_swing_mode = swing_mode
self.async_write_ha_state()
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set new target temperature."""
payload = self._command_templates[CONF_FAN_MODE_COMMAND_TEMPLATE](fan_mode)
await self._publish(CONF_FAN_MODE_COMMAND_TOPIC, payload)
if self._topic[CONF_FAN_MODE_STATE_TOPIC] is None:
self._current_fan_mode = fan_mode
self.async_write_ha_state()
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new operation mode."""
if hvac_mode == HVACMode.OFF:
await self._publish(
CONF_POWER_COMMAND_TOPIC, self._config[CONF_PAYLOAD_OFF]
)
else:
await self._publish(CONF_POWER_COMMAND_TOPIC, self._config[CONF_PAYLOAD_ON])
payload = self._command_templates[CONF_MODE_COMMAND_TEMPLATE](hvac_mode)
await self._publish(CONF_MODE_COMMAND_TOPIC, payload)
if self._topic[CONF_MODE_STATE_TOPIC] is None:
self._current_operation = hvac_mode
self.async_write_ha_state()
@property
def swing_mode(self) -> str | None:
"""Return the swing setting."""
return self._current_swing_mode
@property
def swing_modes(self) -> list[str]:
"""List of available swing modes."""
return self._config[CONF_SWING_MODE_LIST]
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set a preset mode."""
if self._feature_preset_mode:
if preset_mode not in self.preset_modes and preset_mode is not PRESET_NONE:
_LOGGER.warning("'%s' is not a valid preset mode", preset_mode)
return
mqtt_payload = self._command_templates[CONF_PRESET_MODE_COMMAND_TEMPLATE](
preset_mode
)
await self._publish(
CONF_PRESET_MODE_COMMAND_TOPIC,
mqtt_payload,
)
if self._optimistic_preset_mode:
self._preset_mode = preset_mode if preset_mode != PRESET_NONE else None
self.async_write_ha_state()
return
async def _set_aux_heat(self, state):
await self._publish(
CONF_AUX_COMMAND_TOPIC,
self._config[CONF_PAYLOAD_ON] if state else self._config[CONF_PAYLOAD_OFF],
)
if self._topic[CONF_AUX_STATE_TOPIC] is None:
self._aux = state
self.async_write_ha_state()
async def async_turn_aux_heat_on(self) -> None:
"""Turn auxiliary heater on."""
await self._set_aux_heat(True)
async def async_turn_aux_heat_off(self) -> None:
"""Turn auxiliary heater off."""
await self._set_aux_heat(False)
@property
def supported_features(self) -> int:
"""Return the list of supported features."""
support = 0
support: int = 0
if (self._topic[CONF_TEMP_STATE_TOPIC] is not None) or (
self._topic[CONF_TEMP_COMMAND_TOPIC] is not None
):
@ -918,22 +496,346 @@ class MqttClimate(MqttEntity, ClimateEntity):
self._topic[CONF_AUX_COMMAND_TOPIC] is not None
):
support |= ClimateEntityFeature.AUX_HEAT
self._attr_supported_features = support
return support
def _prepare_subscribe_topics(self): # noqa: C901
"""(Re)Subscribe to topics."""
topics = {}
qos = self._config[CONF_QOS]
@property
def min_temp(self) -> float:
"""Return the minimum temperature."""
return self._config[CONF_TEMP_MIN]
def add_subscription(topics, topic, msg_callback):
if self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": msg_callback,
"qos": qos,
"encoding": self._config[CONF_ENCODING] or None,
}
@property
def max_temp(self) -> float:
"""Return the maximum temperature."""
return self._config[CONF_TEMP_MAX]
def render_template(msg, template_name):
template = self._value_templates[template_name]
return template(msg.payload)
@property
def precision(self) -> float:
"""Return the precision of the system."""
if (precision := self._config.get(CONF_PRECISION)) is not None:
return precision
return super().precision
@callback
@log_messages(self.hass, self.entity_id)
def handle_action_received(msg):
"""Handle receiving action via MQTT."""
payload = render_template(msg, CONF_ACTION_TEMPLATE)
if not payload or payload == PAYLOAD_NONE:
_LOGGER.debug(
"Invalid %s action: %s, ignoring",
[e.value for e in HVACAction],
payload,
)
return
try:
self._attr_hvac_action = HVACAction(payload)
except ValueError:
_LOGGER.warning(
"Invalid %s action: %s",
[e.value for e in HVACAction],
payload,
)
return
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
add_subscription(topics, CONF_ACTION_TOPIC, handle_action_received)
@callback
def handle_temperature_received(msg, template_name, attr):
"""Handle temperature coming via MQTT."""
payload = render_template(msg, template_name)
try:
setattr(self, attr, float(payload))
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
except ValueError:
_LOGGER.error("Could not parse temperature from %s", payload)
@callback
@log_messages(self.hass, self.entity_id)
def handle_current_temperature_received(msg):
"""Handle current temperature coming via MQTT."""
handle_temperature_received(
msg, CONF_CURRENT_TEMP_TEMPLATE, "_attr_current_temperature"
)
add_subscription(
topics, CONF_CURRENT_TEMP_TOPIC, handle_current_temperature_received
)
@callback
@log_messages(self.hass, self.entity_id)
def handle_target_temperature_received(msg):
"""Handle target temperature coming via MQTT."""
handle_temperature_received(
msg, CONF_TEMP_STATE_TEMPLATE, "_attr_target_temperature"
)
add_subscription(
topics, CONF_TEMP_STATE_TOPIC, handle_target_temperature_received
)
@callback
@log_messages(self.hass, self.entity_id)
def handle_temperature_low_received(msg):
"""Handle target temperature low coming via MQTT."""
handle_temperature_received(
msg, CONF_TEMP_LOW_STATE_TEMPLATE, "_attr_target_temperature_low"
)
add_subscription(
topics, CONF_TEMP_LOW_STATE_TOPIC, handle_temperature_low_received
)
@callback
@log_messages(self.hass, self.entity_id)
def handle_temperature_high_received(msg):
"""Handle target temperature high coming via MQTT."""
handle_temperature_received(
msg, CONF_TEMP_HIGH_STATE_TEMPLATE, "_attr_target_temperature_high"
)
add_subscription(
topics, CONF_TEMP_HIGH_STATE_TOPIC, handle_temperature_high_received
)
@callback
def handle_mode_received(msg, template_name, attr, mode_list):
"""Handle receiving listed mode via MQTT."""
payload = render_template(msg, template_name)
if payload not in self._config[mode_list]:
_LOGGER.error("Invalid %s mode: %s", mode_list, payload)
else:
setattr(self, attr, payload)
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
@callback
@log_messages(self.hass, self.entity_id)
def handle_current_mode_received(msg):
"""Handle receiving mode via MQTT."""
handle_mode_received(
msg, CONF_MODE_STATE_TEMPLATE, "_attr_hvac_mode", CONF_MODE_LIST
)
add_subscription(topics, CONF_MODE_STATE_TOPIC, handle_current_mode_received)
@callback
@log_messages(self.hass, self.entity_id)
def handle_fan_mode_received(msg):
"""Handle receiving fan mode via MQTT."""
handle_mode_received(
msg,
CONF_FAN_MODE_STATE_TEMPLATE,
"_attr_fan_mode",
CONF_FAN_MODE_LIST,
)
add_subscription(topics, CONF_FAN_MODE_STATE_TOPIC, handle_fan_mode_received)
@callback
@log_messages(self.hass, self.entity_id)
def handle_swing_mode_received(msg):
"""Handle receiving swing mode via MQTT."""
handle_mode_received(
msg,
CONF_SWING_MODE_STATE_TEMPLATE,
"_attr_swing_mode",
CONF_SWING_MODE_LIST,
)
add_subscription(
topics, CONF_SWING_MODE_STATE_TOPIC, handle_swing_mode_received
)
@callback
def handle_onoff_mode_received(msg, template_name, attr):
"""Handle receiving on/off mode via MQTT."""
payload = render_template(msg, template_name)
payload_on = self._config[CONF_PAYLOAD_ON]
payload_off = self._config[CONF_PAYLOAD_OFF]
if payload == "True":
payload = payload_on
elif payload == "False":
payload = payload_off
if payload == payload_on:
setattr(self, attr, True)
elif payload == payload_off:
setattr(self, attr, False)
else:
_LOGGER.error("Invalid %s mode: %s", attr, payload)
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
@callback
@log_messages(self.hass, self.entity_id)
def handle_aux_mode_received(msg):
"""Handle receiving aux mode via MQTT."""
handle_onoff_mode_received(
msg, CONF_AUX_STATE_TEMPLATE, "_attr_is_aux_heat"
)
add_subscription(topics, CONF_AUX_STATE_TOPIC, handle_aux_mode_received)
@callback
@log_messages(self.hass, self.entity_id)
def handle_preset_mode_received(msg):
"""Handle receiving preset mode via MQTT."""
preset_mode = render_template(msg, CONF_PRESET_MODE_VALUE_TEMPLATE)
if preset_mode in [PRESET_NONE, PAYLOAD_NONE]:
self._attr_preset_mode = PRESET_NONE
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
return
if not preset_mode:
_LOGGER.debug("Ignoring empty preset_mode from '%s'", msg.topic)
return
if preset_mode not in self.preset_modes:
_LOGGER.warning(
"'%s' received on topic %s. '%s' is not a valid preset mode",
msg.payload,
msg.topic,
preset_mode,
)
else:
self._attr_preset_mode = preset_mode
get_mqtt_data(self.hass).state_write_requests.write_state_request(self)
add_subscription(
topics, CONF_PRESET_MODE_STATE_TOPIC, handle_preset_mode_received
)
self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
)
async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
await subscription.async_subscribe_topics(self.hass, self._sub_state)
async def _publish(self, topic, payload):
if self._topic[topic] is not None:
await self.async_publish(
self._topic[topic],
payload,
self._config[CONF_QOS],
self._config[CONF_RETAIN],
self._config[CONF_ENCODING],
)
async def _set_temperature(
self, temp, cmnd_topic, cmnd_template, state_topic, attr
):
if temp is not None:
if self._topic[state_topic] is None:
# optimistic mode
setattr(self, attr, temp)
payload = self._command_templates[cmnd_template](temp)
await self._publish(cmnd_topic, payload)
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set new target temperatures."""
if (operation_mode := kwargs.get(ATTR_HVAC_MODE)) is not None:
await self.async_set_hvac_mode(operation_mode)
await self._set_temperature(
kwargs.get(ATTR_TEMPERATURE),
CONF_TEMP_COMMAND_TOPIC,
CONF_TEMP_COMMAND_TEMPLATE,
CONF_TEMP_STATE_TOPIC,
"_attr_target_temperature",
)
await self._set_temperature(
kwargs.get(ATTR_TARGET_TEMP_LOW),
CONF_TEMP_LOW_COMMAND_TOPIC,
CONF_TEMP_LOW_COMMAND_TEMPLATE,
CONF_TEMP_LOW_STATE_TOPIC,
"_attr_target_temperature_low",
)
await self._set_temperature(
kwargs.get(ATTR_TARGET_TEMP_HIGH),
CONF_TEMP_HIGH_COMMAND_TOPIC,
CONF_TEMP_HIGH_COMMAND_TEMPLATE,
CONF_TEMP_HIGH_STATE_TOPIC,
"_attr_target_temperature_high",
)
self.async_write_ha_state()
async def async_set_swing_mode(self, swing_mode: str) -> None:
"""Set new swing mode."""
payload = self._command_templates[CONF_SWING_MODE_COMMAND_TEMPLATE](swing_mode)
await self._publish(CONF_SWING_MODE_COMMAND_TOPIC, payload)
if self._topic[CONF_SWING_MODE_STATE_TOPIC] is None:
self._attr_swing_mode = swing_mode
self.async_write_ha_state()
async def async_set_fan_mode(self, fan_mode: str) -> None:
"""Set new target temperature."""
payload = self._command_templates[CONF_FAN_MODE_COMMAND_TEMPLATE](fan_mode)
await self._publish(CONF_FAN_MODE_COMMAND_TOPIC, payload)
if self._topic[CONF_FAN_MODE_STATE_TOPIC] is None:
self._attr_fan_mode = fan_mode
self.async_write_ha_state()
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new operation mode."""
if hvac_mode == HVACMode.OFF:
await self._publish(
CONF_POWER_COMMAND_TOPIC, self._config[CONF_PAYLOAD_OFF]
)
else:
await self._publish(CONF_POWER_COMMAND_TOPIC, self._config[CONF_PAYLOAD_ON])
payload = self._command_templates[CONF_MODE_COMMAND_TEMPLATE](hvac_mode)
await self._publish(CONF_MODE_COMMAND_TOPIC, payload)
if self._topic[CONF_MODE_STATE_TOPIC] is None:
self._attr_hvac_mode = hvac_mode
self.async_write_ha_state()
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set a preset mode."""
if self._feature_preset_mode and self.preset_modes:
if preset_mode not in self.preset_modes and preset_mode is not PRESET_NONE:
_LOGGER.warning("'%s' is not a valid preset mode", preset_mode)
return
mqtt_payload = self._command_templates[CONF_PRESET_MODE_COMMAND_TEMPLATE](
preset_mode
)
await self._publish(
CONF_PRESET_MODE_COMMAND_TOPIC,
mqtt_payload,
)
if self._optimistic_preset_mode:
self._attr_preset_mode = preset_mode
self.async_write_ha_state()
return
async def _set_aux_heat(self, state):
await self._publish(
CONF_AUX_COMMAND_TOPIC,
self._config[CONF_PAYLOAD_ON] if state else self._config[CONF_PAYLOAD_OFF],
)
if self._topic[CONF_AUX_STATE_TOPIC] is None:
self._attr_is_aux_heat = state
self.async_write_ha_state()
async def async_turn_aux_heat_on(self) -> None:
"""Turn auxiliary heater on."""
await self._set_aux_heat(True)
async def async_turn_aux_heat_off(self) -> None:
"""Turn auxiliary heater off."""
await self._set_aux_heat(False)