Refactor ranging utils for mqtt cover (#105739)

* Refactor ranging utils for mqtt cover

* Use literals for default min and max percentage
This commit is contained in:
Jan Bouwhuis 2023-12-21 11:54:05 +01:00 committed by GitHub
parent 0614e291c1
commit f8f31627ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 254 additions and 449 deletions

View File

@ -32,6 +32,10 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.service_info.mqtt import ReceivePayloadType
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
from homeassistant.util.percentage import (
percentage_to_ranged_value,
ranged_value_to_percentage,
)
from . import subscription
from .config import MQTT_BASE_SCHEMA
@ -241,6 +245,10 @@ class MqttCover(MqttEntity, CoverEntity):
_entity_id_format: str = cover.ENTITY_ID_FORMAT
_optimistic: bool
_tilt_optimistic: bool
_tilt_closed_percentage: int
_tilt_open_percentage: int
_pos_range: tuple[int, int]
_tilt_range: tuple[int, int]
@staticmethod
def config_schema() -> vol.Schema:
@ -248,6 +256,15 @@ class MqttCover(MqttEntity, CoverEntity):
return DISCOVERY_SCHEMA
def _setup_from_config(self, config: ConfigType) -> None:
"""Set up cover from config."""
self._pos_range = (config[CONF_POSITION_CLOSED] + 1, config[CONF_POSITION_OPEN])
self._tilt_range = (config[CONF_TILT_MIN] + 1, config[CONF_TILT_MAX])
self._tilt_closed_percentage = ranged_value_to_percentage(
self._tilt_range, config[CONF_TILT_CLOSED_POSITION]
)
self._tilt_open_percentage = ranged_value_to_percentage(
self._tilt_range, config[CONF_TILT_OPEN_POSITION]
)
no_position = (
config.get(CONF_SET_POSITION_TOPIC) is None
and config.get(CONF_GET_POSITION_TOPIC) is None
@ -286,23 +303,22 @@ class MqttCover(MqttEntity, CoverEntity):
)
template_config_attributes = {
"position_open": self._config[CONF_POSITION_OPEN],
"position_closed": self._config[CONF_POSITION_CLOSED],
"tilt_min": self._config[CONF_TILT_MIN],
"tilt_max": self._config[CONF_TILT_MAX],
"position_open": config[CONF_POSITION_OPEN],
"position_closed": config[CONF_POSITION_CLOSED],
"tilt_min": config[CONF_TILT_MIN],
"tilt_max": config[CONF_TILT_MAX],
}
self._value_template = MqttValueTemplate(
self._config.get(CONF_VALUE_TEMPLATE),
entity=self,
config.get(CONF_VALUE_TEMPLATE), entity=self
).async_render_with_possible_json_value
self._set_position_template = MqttCommandTemplate(
self._config.get(CONF_SET_POSITION_TEMPLATE), entity=self
config.get(CONF_SET_POSITION_TEMPLATE), entity=self
).async_render
self._get_position_template = MqttValueTemplate(
self._config.get(CONF_GET_POSITION_TEMPLATE),
config.get(CONF_GET_POSITION_TEMPLATE),
entity=self,
config_attributes=template_config_attributes,
).async_render_with_possible_json_value
@ -445,19 +461,17 @@ class MqttCover(MqttEntity, CoverEntity):
payload = payload_dict["position"]
try:
percentage_payload = self.find_percentage_in_range(
float(payload), COVER_PAYLOAD
percentage_payload = ranged_value_to_percentage(
self._pos_range, float(payload)
)
except ValueError:
_LOGGER.warning("Payload '%s' is not numeric", payload)
return
self._attr_current_cover_position = percentage_payload
self._attr_current_cover_position = min(100, max(0, percentage_payload))
if self._config.get(CONF_STATE_TOPIC) is None:
self._update_state(
STATE_CLOSED
if percentage_payload == DEFAULT_POSITION_CLOSED
else STATE_OPEN
STATE_CLOSED if self.current_cover_position == 0 else STATE_OPEN
)
if self._config.get(CONF_GET_POSITION_TOPIC):
@ -508,9 +522,7 @@ class MqttCover(MqttEntity, CoverEntity):
# Optimistically assume that cover has changed state.
self._update_state(STATE_OPEN)
if self._config.get(CONF_GET_POSITION_TOPIC):
self._attr_current_cover_position = self.find_percentage_in_range(
self._config[CONF_POSITION_OPEN], COVER_PAYLOAD
)
self._attr_current_cover_position = 100
self.async_write_ha_state()
async def async_close_cover(self, **kwargs: Any) -> None:
@ -529,9 +541,7 @@ class MqttCover(MqttEntity, CoverEntity):
# Optimistically assume that cover has changed state.
self._update_state(STATE_CLOSED)
if self._config.get(CONF_GET_POSITION_TOPIC):
self._attr_current_cover_position = self.find_percentage_in_range(
self._config[CONF_POSITION_CLOSED], COVER_PAYLOAD
)
self._attr_current_cover_position = 0
self.async_write_ha_state()
async def async_stop_cover(self, **kwargs: Any) -> None:
@ -567,9 +577,7 @@ class MqttCover(MqttEntity, CoverEntity):
self._config[CONF_ENCODING],
)
if self._tilt_optimistic:
self._attr_current_cover_tilt_position = self.find_percentage_in_range(
float(self._config[CONF_TILT_OPEN_POSITION])
)
self._attr_current_cover_tilt_position = self._tilt_open_percentage
self.async_write_ha_state()
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
@ -594,58 +602,60 @@ class MqttCover(MqttEntity, CoverEntity):
self._config[CONF_ENCODING],
)
if self._tilt_optimistic:
self._attr_current_cover_tilt_position = self.find_percentage_in_range(
float(self._config[CONF_TILT_CLOSED_POSITION])
)
self._attr_current_cover_tilt_position = self._tilt_closed_percentage
self.async_write_ha_state()
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover tilt to a specific position."""
tilt = kwargs[ATTR_TILT_POSITION]
percentage_tilt = tilt
tilt = self.find_in_range_from_percent(tilt)
tilt_percentage = kwargs[ATTR_TILT_POSITION]
tilt_ranged = round(
percentage_to_ranged_value(self._tilt_range, tilt_percentage)
)
# Handover the tilt after calculated from percent would make it more
# consistent with receiving templates
variables = {
"tilt_position": percentage_tilt,
"tilt_position": tilt_percentage,
"entity_id": self.entity_id,
"position_open": self._config.get(CONF_POSITION_OPEN),
"position_closed": self._config.get(CONF_POSITION_CLOSED),
"tilt_min": self._config.get(CONF_TILT_MIN),
"tilt_max": self._config.get(CONF_TILT_MAX),
}
tilt = self._set_tilt_template(tilt, variables=variables)
tilt_rendered = self._set_tilt_template(tilt_ranged, variables=variables)
await self.async_publish(
self._config[CONF_TILT_COMMAND_TOPIC],
tilt,
tilt_rendered,
self._config[CONF_QOS],
self._config[CONF_RETAIN],
self._config[CONF_ENCODING],
)
if self._tilt_optimistic:
_LOGGER.debug("Set tilt value optimistic")
self._attr_current_cover_tilt_position = percentage_tilt
self._attr_current_cover_tilt_position = tilt_percentage
self.async_write_ha_state()
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position."""
position = kwargs[ATTR_POSITION]
percentage_position = position
position = self.find_in_range_from_percent(position, COVER_PAYLOAD)
position_percentage = kwargs[ATTR_POSITION]
position_ranged = round(
percentage_to_ranged_value(self._pos_range, position_percentage)
)
variables = {
"position": percentage_position,
"position": position_percentage,
"entity_id": self.entity_id,
"position_open": self._config[CONF_POSITION_OPEN],
"position_closed": self._config[CONF_POSITION_CLOSED],
"tilt_min": self._config[CONF_TILT_MIN],
"tilt_max": self._config[CONF_TILT_MAX],
}
position = self._set_position_template(position, variables=variables)
position_rendered = self._set_position_template(
position_ranged, variables=variables
)
await self.async_publish(
self._config[CONF_SET_POSITION_TOPIC],
position,
position_rendered,
self._config[CONF_QOS],
self._config[CONF_RETAIN],
self._config[CONF_ENCODING],
@ -653,87 +663,37 @@ class MqttCover(MqttEntity, CoverEntity):
if self._optimistic:
self._update_state(
STATE_CLOSED
if percentage_position == self._config[CONF_POSITION_CLOSED]
if position_percentage <= self._config[CONF_POSITION_CLOSED]
else STATE_OPEN
)
self._attr_current_cover_position = percentage_position
self._attr_current_cover_position = position_percentage
self.async_write_ha_state()
async def async_toggle_tilt(self, **kwargs: Any) -> None:
"""Toggle the entity."""
if self.is_tilt_closed():
if (
self.current_cover_tilt_position is not None
and self.current_cover_tilt_position <= self._tilt_closed_percentage
):
await self.async_open_cover_tilt(**kwargs)
else:
await self.async_close_cover_tilt(**kwargs)
def is_tilt_closed(self) -> bool:
"""Return if the cover is tilted closed."""
return self._attr_current_cover_tilt_position == self.find_percentage_in_range(
float(self._config[CONF_TILT_CLOSED_POSITION])
)
def find_percentage_in_range(
self, position: float, range_type: str = TILT_PAYLOAD
) -> int:
"""Find the 0-100% value within the specified range."""
# the range of motion as defined by the min max values
if range_type == COVER_PAYLOAD:
max_range: int = self._config[CONF_POSITION_OPEN]
min_range: int = self._config[CONF_POSITION_CLOSED]
else:
max_range = self._config[CONF_TILT_MAX]
min_range = self._config[CONF_TILT_MIN]
current_range = max_range - min_range
# offset to be zero based
offset_position = position - min_range
position_percentage = round(float(offset_position) / current_range * 100.0)
max_percent = 100
min_percent = 0
position_percentage = min(max(position_percentage, min_percent), max_percent)
return position_percentage
def find_in_range_from_percent(
self, percentage: float, range_type: str = TILT_PAYLOAD
) -> int:
"""Find the adjusted value for 0-100% within the specified range.
if the range is 80-180 and the percentage is 90
this method would determine the value to send on the topic
by offsetting the max and min, getting the percentage value and
returning the offset
"""
if range_type == COVER_PAYLOAD:
max_range: int = self._config[CONF_POSITION_OPEN]
min_range: int = self._config[CONF_POSITION_CLOSED]
else:
max_range = self._config[CONF_TILT_MAX]
min_range = self._config[CONF_TILT_MIN]
offset = min_range
current_range = max_range - min_range
position = round(current_range * (percentage / 100.0))
position += offset
return position
@callback
def tilt_payload_received(self, _payload: Any) -> None:
"""Set the tilt value."""
try:
payload = int(round(float(_payload)))
payload = round(float(_payload))
except ValueError:
_LOGGER.warning("Payload '%s' is not numeric", _payload)
return
if (
self._config[CONF_TILT_MIN] <= int(payload) <= self._config[CONF_TILT_MAX]
or self._config[CONF_TILT_MAX]
<= int(payload)
<= self._config[CONF_TILT_MIN]
self._config[CONF_TILT_MIN] <= payload <= self._config[CONF_TILT_MAX]
or self._config[CONF_TILT_MAX] <= payload <= self._config[CONF_TILT_MIN]
):
level = self.find_percentage_in_range(payload)
level = ranged_value_to_percentage(self._tilt_range, payload)
self._attr_current_cover_tilt_position = level
else:
_LOGGER.warning(

View File

@ -23,7 +23,6 @@ from homeassistant.components.mqtt.cover import (
CONF_TILT_STATUS_TEMPLATE,
CONF_TILT_STATUS_TOPIC,
MQTT_COVER_ATTRIBUTES_BLOCKED,
MqttCover,
)
from homeassistant.const import (
ATTR_ASSUMED_STATE,
@ -197,8 +196,21 @@ async def test_opening_and_closing_state_via_custom_state_payload(
}
],
)
@pytest.mark.parametrize(
("position", "assert_state"),
[
(0, STATE_CLOSED),
(1, STATE_OPEN),
(30, STATE_OPEN),
(99, STATE_OPEN),
(100, STATE_OPEN),
],
)
async def test_open_closed_state_from_position_optimistic(
hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
position: int,
assert_state: str,
) -> None:
"""Test the state after setting the position using optimistic mode."""
await mqtt_mock_entry()
@ -209,24 +221,201 @@ async def test_open_closed_state_from_position_optimistic(
await hass.services.async_call(
cover.DOMAIN,
SERVICE_SET_COVER_POSITION,
{ATTR_ENTITY_ID: "cover.test", ATTR_POSITION: 0},
{ATTR_ENTITY_ID: "cover.test", ATTR_POSITION: position},
blocking=True,
)
state = hass.states.get("cover.test")
assert state.state == STATE_CLOSED
assert state.state == assert_state
assert state.attributes.get(ATTR_ASSUMED_STATE)
assert state.attributes.get(ATTR_CURRENT_POSITION) == position
@pytest.mark.parametrize(
"hass_config",
[
{
mqtt.DOMAIN: {
cover.DOMAIN: {
"name": "test",
"position_topic": "position-topic",
"set_position_topic": "set-position-topic",
"qos": 0,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"optimistic": True,
"position_closed": 10,
"position_open": 90,
}
}
}
],
)
@pytest.mark.parametrize(
("position", "assert_state"),
[
(0, STATE_CLOSED),
(1, STATE_CLOSED),
(10, STATE_CLOSED),
(11, STATE_OPEN),
(30, STATE_OPEN),
(99, STATE_OPEN),
(100, STATE_OPEN),
],
)
async def test_open_closed_state_from_position_optimistic_alt_positions(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
position: int,
assert_state: str,
) -> None:
"""Test the state after setting the position.
Test with alt opened and closed positions using optimistic mode.
"""
await mqtt_mock_entry()
state = hass.states.get("cover.test")
assert state.state == STATE_UNKNOWN
await hass.services.async_call(
cover.DOMAIN,
SERVICE_SET_COVER_POSITION,
{ATTR_ENTITY_ID: "cover.test", ATTR_POSITION: 100},
{ATTR_ENTITY_ID: "cover.test", ATTR_POSITION: position},
blocking=True,
)
state = hass.states.get("cover.test")
assert state.state == STATE_OPEN
assert state.state == assert_state
assert state.attributes.get(ATTR_ASSUMED_STATE)
assert state.attributes.get(ATTR_CURRENT_POSITION) == position
@pytest.mark.parametrize(
"hass_config",
[
{
mqtt.DOMAIN: {
cover.DOMAIN: {
"name": "test",
"tilt_command_topic": "set-position-topic",
"qos": 0,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"optimistic": True,
}
}
}
],
)
@pytest.mark.parametrize(
("tilt_position", "tilt_toggled_position"),
[(0, 100), (1, 0), (99, 0), (100, 0)],
)
async def test_tilt_open_closed_toggle_optimistic(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
tilt_position: int,
tilt_toggled_position: int,
) -> None:
"""Test the tilt state after setting and toggling the tilt position.
Test opened and closed tilt positions using optimistic mode.
"""
await mqtt_mock_entry()
state = hass.states.get("cover.test")
assert state.state == STATE_UNKNOWN
await hass.services.async_call(
cover.DOMAIN,
SERVICE_SET_COVER_TILT_POSITION,
{ATTR_ENTITY_ID: "cover.test", ATTR_TILT_POSITION: tilt_position},
blocking=True,
)
state = hass.states.get("cover.test")
assert state.attributes.get(ATTR_ASSUMED_STATE)
assert state.attributes.get(ATTR_CURRENT_TILT_POSITION) == tilt_position
# toggle cover tilt
await hass.services.async_call(
cover.DOMAIN,
SERVICE_TOGGLE_COVER_TILT,
{ATTR_ENTITY_ID: "cover.test"},
blocking=True,
)
state = hass.states.get("cover.test")
assert state.attributes.get(ATTR_ASSUMED_STATE)
assert state.attributes.get(ATTR_CURRENT_TILT_POSITION) == tilt_toggled_position
@pytest.mark.parametrize(
"hass_config",
[
{
mqtt.DOMAIN: {
cover.DOMAIN: {
"name": "test",
"tilt_command_topic": "set-position-topic",
"qos": 0,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"optimistic": True,
"tilt_min": 5,
"tilt_max": 95,
"tilt_closed_value": 15,
"tilt_opened_value": 85,
}
}
}
],
)
@pytest.mark.parametrize(
("tilt_position", "tilt_toggled_position"),
[(0, 88), (11, 88), (12, 11), (30, 11), (90, 11), (100, 11)],
)
async def test_tilt_open_closed_toggle_optimistic_alt_positions(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
tilt_position: int,
tilt_toggled_position: int,
) -> None:
"""Test the tilt state after setting and toggling the tilt position.
Test with alt opened and closed tilt positions using optimistic mode.
"""
await mqtt_mock_entry()
state = hass.states.get("cover.test")
assert state.state == STATE_UNKNOWN
await hass.services.async_call(
cover.DOMAIN,
SERVICE_SET_COVER_TILT_POSITION,
{ATTR_ENTITY_ID: "cover.test", ATTR_TILT_POSITION: tilt_position},
blocking=True,
)
state = hass.states.get("cover.test")
assert state.attributes.get(ATTR_ASSUMED_STATE)
assert state.attributes.get(ATTR_CURRENT_TILT_POSITION) == tilt_position
# toggle cover tilt
await hass.services.async_call(
cover.DOMAIN,
SERVICE_TOGGLE_COVER_TILT,
{ATTR_ENTITY_ID: "cover.test"},
blocking=True,
)
state = hass.states.get("cover.test")
assert state.attributes.get(ATTR_ASSUMED_STATE)
assert state.attributes.get(ATTR_CURRENT_TILT_POSITION) == tilt_toggled_position
@pytest.mark.parametrize(
@ -2236,350 +2425,6 @@ async def test_tilt_position_altered_range(
)
async def test_find_percentage_in_range_defaults(hass: HomeAssistant) -> None:
"""Test find percentage in range with default range."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 100,
"position_closed": 0,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 100,
"tilt_closed_position": 0,
"tilt_min": 0,
"tilt_max": 100,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_percentage_in_range(44) == 44
assert mqtt_cover.find_percentage_in_range(44, "cover") == 44
async def test_find_percentage_in_range_altered(hass: HomeAssistant) -> None:
"""Test find percentage in range with altered range."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 180,
"position_closed": 80,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 180,
"tilt_closed_position": 80,
"tilt_min": 80,
"tilt_max": 180,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_percentage_in_range(120) == 40
assert mqtt_cover.find_percentage_in_range(120, "cover") == 40
async def test_find_percentage_in_range_defaults_inverted(hass: HomeAssistant) -> None:
"""Test find percentage in range with default range but inverted."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 0,
"position_closed": 100,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 100,
"tilt_closed_position": 0,
"tilt_min": 100,
"tilt_max": 0,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_percentage_in_range(44) == 56
assert mqtt_cover.find_percentage_in_range(44, "cover") == 56
async def test_find_percentage_in_range_altered_inverted(hass: HomeAssistant) -> None:
"""Test find percentage in range with altered range and inverted."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 80,
"position_closed": 180,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 180,
"tilt_closed_position": 80,
"tilt_min": 180,
"tilt_max": 80,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_percentage_in_range(120) == 60
assert mqtt_cover.find_percentage_in_range(120, "cover") == 60
async def test_find_in_range_defaults(hass: HomeAssistant) -> None:
"""Test find in range with default range."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 100,
"position_closed": 0,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 100,
"tilt_closed_position": 0,
"tilt_min": 0,
"tilt_max": 100,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_in_range_from_percent(44) == 44
assert mqtt_cover.find_in_range_from_percent(44, "cover") == 44
async def test_find_in_range_altered(hass: HomeAssistant) -> None:
"""Test find in range with altered range."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 180,
"position_closed": 80,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 180,
"tilt_closed_position": 80,
"tilt_min": 80,
"tilt_max": 180,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_in_range_from_percent(40) == 120
assert mqtt_cover.find_in_range_from_percent(40, "cover") == 120
async def test_find_in_range_defaults_inverted(hass: HomeAssistant) -> None:
"""Test find in range with default range but inverted."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 0,
"position_closed": 100,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 100,
"tilt_closed_position": 0,
"tilt_min": 100,
"tilt_max": 0,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_in_range_from_percent(56) == 44
assert mqtt_cover.find_in_range_from_percent(56, "cover") == 44
async def test_find_in_range_altered_inverted(hass: HomeAssistant) -> None:
"""Test find in range with altered range and inverted."""
mqtt_cover = MqttCover(
hass,
{
"name": "cover.test",
"state_topic": "state-topic",
"get_position_topic": None,
"command_topic": "command-topic",
"availability_topic": None,
"tilt_command_topic": "tilt-command-topic",
"tilt_status_topic": "tilt-status-topic",
"qos": 0,
"retain": False,
"state_open": "OPEN",
"state_closed": "CLOSE",
"position_open": 80,
"position_closed": 180,
"payload_open": "OPEN",
"payload_close": "CLOSE",
"payload_stop": "STOP",
"payload_available": None,
"payload_not_available": None,
"optimistic": False,
"value_template": None,
"tilt_open_position": 180,
"tilt_closed_position": 80,
"tilt_min": 180,
"tilt_max": 80,
"tilt_optimistic": False,
"set_position_topic": None,
"set_position_template": None,
"unique_id": None,
"device_config": None,
},
None,
None,
)
assert mqtt_cover.find_in_range_from_percent(60) == 120
assert mqtt_cover.find_in_range_from_percent(60, "cover") == 120
@pytest.mark.parametrize("hass_config", [DEFAULT_CONFIG])
async def test_availability_when_connection_lost(
hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator