Update homekit to use new fan entity model (#45549)

This commit is contained in:
J. Nick Koston 2021-01-28 05:38:18 -06:00 committed by GitHub
parent f1c24939f3
commit 3ff75eee53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 195 deletions

View File

@ -6,14 +6,13 @@ from pyhap.const import CATEGORY_FAN
from homeassistant.components.fan import ( from homeassistant.components.fan import (
ATTR_DIRECTION, ATTR_DIRECTION,
ATTR_OSCILLATING, ATTR_OSCILLATING,
ATTR_SPEED, ATTR_PERCENTAGE,
ATTR_SPEED_LIST,
DIRECTION_FORWARD, DIRECTION_FORWARD,
DIRECTION_REVERSE, DIRECTION_REVERSE,
DOMAIN, DOMAIN,
SERVICE_OSCILLATE, SERVICE_OSCILLATE,
SERVICE_SET_DIRECTION, SERVICE_SET_DIRECTION,
SERVICE_SET_SPEED, SERVICE_SET_PERCENTAGE,
SUPPORT_DIRECTION, SUPPORT_DIRECTION,
SUPPORT_OSCILLATE, SUPPORT_OSCILLATE,
SUPPORT_SET_SPEED, SUPPORT_SET_SPEED,
@ -36,7 +35,6 @@ from .const import (
CHAR_SWING_MODE, CHAR_SWING_MODE,
SERV_FANV2, SERV_FANV2,
) )
from .util import HomeKitSpeedMapping
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -61,10 +59,6 @@ class Fan(HomeAccessory):
if features & SUPPORT_OSCILLATE: if features & SUPPORT_OSCILLATE:
chars.append(CHAR_SWING_MODE) chars.append(CHAR_SWING_MODE)
if features & SUPPORT_SET_SPEED: if features & SUPPORT_SET_SPEED:
speed_list = self.hass.states.get(self.entity_id).attributes.get(
ATTR_SPEED_LIST
)
self.speed_mapping = HomeKitSpeedMapping(speed_list)
chars.append(CHAR_ROTATION_SPEED) chars.append(CHAR_ROTATION_SPEED)
serv_fan = self.add_preload_service(SERV_FANV2, chars) serv_fan = self.add_preload_service(SERV_FANV2, chars)
@ -117,7 +111,7 @@ class Fan(HomeAccessory):
# We always do this LAST to ensure they # We always do this LAST to ensure they
# get the speed they asked for # get the speed they asked for
if CHAR_ROTATION_SPEED in char_values: if CHAR_ROTATION_SPEED in char_values:
self.set_speed(char_values[CHAR_ROTATION_SPEED]) self.set_percentage(char_values[CHAR_ROTATION_SPEED])
def set_state(self, value): def set_state(self, value):
"""Set state if call came from HomeKit.""" """Set state if call came from HomeKit."""
@ -140,12 +134,11 @@ class Fan(HomeAccessory):
params = {ATTR_ENTITY_ID: self.entity_id, ATTR_OSCILLATING: oscillating} params = {ATTR_ENTITY_ID: self.entity_id, ATTR_OSCILLATING: oscillating}
self.call_service(DOMAIN, SERVICE_OSCILLATE, params, oscillating) self.call_service(DOMAIN, SERVICE_OSCILLATE, params, oscillating)
def set_speed(self, value): def set_percentage(self, value):
"""Set state if call came from HomeKit.""" """Set state if call came from HomeKit."""
_LOGGER.debug("%s: Set speed to %d", self.entity_id, value) _LOGGER.debug("%s: Set speed to %d", self.entity_id, value)
speed = self.speed_mapping.speed_to_states(value) params = {ATTR_ENTITY_ID: self.entity_id, ATTR_PERCENTAGE: value}
params = {ATTR_ENTITY_ID: self.entity_id, ATTR_SPEED: speed} self.call_service(DOMAIN, SERVICE_SET_PERCENTAGE, params, value)
self.call_service(DOMAIN, SERVICE_SET_SPEED, params, speed)
@callback @callback
def async_update_state(self, new_state): def async_update_state(self, new_state):
@ -169,24 +162,22 @@ class Fan(HomeAccessory):
if self.char_speed is not None and state != STATE_OFF: if self.char_speed is not None and state != STATE_OFF:
# We do not change the homekit speed when turning off # We do not change the homekit speed when turning off
# as it will clear the restore state # as it will clear the restore state
speed = new_state.attributes.get(ATTR_SPEED) percentage = new_state.attributes.get(ATTR_PERCENTAGE)
hk_speed_value = self.speed_mapping.speed_to_homekit(speed) # If the homeassistant component reports its speed as the first entry
if hk_speed_value is not None and self.char_speed.value != hk_speed_value: # in its speed list but is not off, the hk_speed_value is 0. But 0
# If the homeassistant component reports its speed as the first entry # is a special value in homekit. When you turn on a homekit accessory
# in its speed list but is not off, the hk_speed_value is 0. But 0 # it will try to restore the last rotation speed state which will be
# is a special value in homekit. When you turn on a homekit accessory # the last value saved by char_speed.set_value. But if it is set to
# it will try to restore the last rotation speed state which will be # 0, HomeKit will update the rotation speed to 100 as it thinks 0 is
# the last value saved by char_speed.set_value. But if it is set to # off.
# 0, HomeKit will update the rotation speed to 100 as it thinks 0 is #
# off. # Therefore, if the hk_speed_value is 0 and the device is still on,
# # the rotation speed is mapped to 1 otherwise the update is ignored
# Therefore, if the hk_speed_value is 0 and the device is still on, # in order to avoid this incorrect behavior.
# the rotation speed is mapped to 1 otherwise the update is ignored if percentage == 0 and state == STATE_ON:
# in order to avoid this incorrect behavior. percentage = 1
if hk_speed_value == 0 and state == STATE_ON: if percentage is not None and self.char_speed.value != percentage:
hk_speed_value = 1 self.char_speed.set_value(percentage)
if self.char_speed.value != hk_speed_value:
self.char_speed.set_value(hk_speed_value)
# Handle Oscillating # Handle Oscillating
if self.char_swing is not None: if self.char_swing is not None:

View File

@ -1,5 +1,4 @@
"""Collection of useful functions for the HomeKit component.""" """Collection of useful functions for the HomeKit component."""
from collections import OrderedDict, namedtuple
import io import io
import ipaddress import ipaddress
import logging import logging
@ -11,7 +10,7 @@ import socket
import pyqrcode import pyqrcode
import voluptuous as vol import voluptuous as vol
from homeassistant.components import binary_sensor, fan, media_player, sensor from homeassistant.components import binary_sensor, media_player, sensor
from homeassistant.const import ( from homeassistant.const import (
ATTR_CODE, ATTR_CODE,
ATTR_SUPPORTED_FEATURES, ATTR_SUPPORTED_FEATURES,
@ -310,56 +309,6 @@ def validate_media_player_features(state, feature_list):
return True return True
SpeedRange = namedtuple("SpeedRange", ("start", "target"))
SpeedRange.__doc__ += """ Maps Home Assistant speed \
values to percentage based HomeKit speeds.
start: Start of the range (inclusive).
target: Percentage to use to determine HomeKit percentages \
from HomeAssistant speed.
"""
class HomeKitSpeedMapping:
"""Supports conversion between Home Assistant and HomeKit fan speeds."""
def __init__(self, speed_list):
"""Initialize a new SpeedMapping object."""
if speed_list[0] != fan.SPEED_OFF:
_LOGGER.warning(
"%s does not contain the speed setting "
"%s as its first element. "
"Assuming that %s is equivalent to 'off'",
speed_list,
fan.SPEED_OFF,
speed_list[0],
)
self.speed_ranges = OrderedDict()
list_size = len(speed_list)
for index, speed in enumerate(speed_list):
# By dividing by list_size -1 the following
# desired attributes hold true:
# * index = 0 => 0%, equal to "off"
# * index = len(speed_list) - 1 => 100 %
# * all other indices are equally distributed
target = index * 100 / (list_size - 1)
start = index * 100 / list_size
self.speed_ranges[speed] = SpeedRange(start, target)
def speed_to_homekit(self, speed):
"""Map Home Assistant speed state to HomeKit speed."""
if speed is None:
return None
speed_range = self.speed_ranges[speed]
return round(speed_range.target)
def speed_to_states(self, speed):
"""Map HomeKit speed to Home Assistant speed state."""
for state, speed_range in reversed(self.speed_ranges.items()):
if speed_range.start <= speed:
return state
return list(self.speed_ranges)[0]
def show_setup_message(hass, entry_id, bridge_name, pincode, uri): def show_setup_message(hass, entry_id, bridge_name, pincode, uri):
"""Display persistent notification with setup information.""" """Display persistent notification with setup information."""
pin = pincode.decode() pin = pincode.decode()

View File

@ -1,6 +1,5 @@
"""Test different accessory types: Fans.""" """Test different accessory types: Fans."""
from collections import namedtuple from collections import namedtuple
from unittest.mock import Mock
from pyhap.const import HAP_REPR_AID, HAP_REPR_CHARS, HAP_REPR_IID, HAP_REPR_VALUE from pyhap.const import HAP_REPR_AID, HAP_REPR_CHARS, HAP_REPR_IID, HAP_REPR_VALUE
import pytest import pytest
@ -8,20 +7,15 @@ import pytest
from homeassistant.components.fan import ( from homeassistant.components.fan import (
ATTR_DIRECTION, ATTR_DIRECTION,
ATTR_OSCILLATING, ATTR_OSCILLATING,
ATTR_SPEED, ATTR_PERCENTAGE,
ATTR_SPEED_LIST,
DIRECTION_FORWARD, DIRECTION_FORWARD,
DIRECTION_REVERSE, DIRECTION_REVERSE,
DOMAIN, DOMAIN,
SPEED_HIGH,
SPEED_LOW,
SPEED_OFF,
SUPPORT_DIRECTION, SUPPORT_DIRECTION,
SUPPORT_OSCILLATE, SUPPORT_OSCILLATE,
SUPPORT_SET_SPEED, SUPPORT_SET_SPEED,
) )
from homeassistant.components.homekit.const import ATTR_VALUE from homeassistant.components.homekit.const import ATTR_VALUE
from homeassistant.components.homekit.util import HomeKitSpeedMapping
from homeassistant.const import ( from homeassistant.const import (
ATTR_ENTITY_ID, ATTR_ENTITY_ID,
ATTR_SUPPORTED_FEATURES, ATTR_SUPPORTED_FEATURES,
@ -266,15 +260,13 @@ async def test_fan_oscillate(hass, hk_driver, cls, events):
async def test_fan_speed(hass, hk_driver, cls, events): async def test_fan_speed(hass, hk_driver, cls, events):
"""Test fan with speed.""" """Test fan with speed."""
entity_id = "fan.demo" entity_id = "fan.demo"
speed_list = [SPEED_OFF, SPEED_LOW, SPEED_HIGH]
hass.states.async_set( hass.states.async_set(
entity_id, entity_id,
STATE_ON, STATE_ON,
{ {
ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED, ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED,
ATTR_SPEED: SPEED_OFF, ATTR_PERCENTAGE: 0,
ATTR_SPEED_LIST: speed_list,
}, },
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -288,20 +280,12 @@ async def test_fan_speed(hass, hk_driver, cls, events):
await acc.run_handler() await acc.run_handler()
await hass.async_block_till_done() await hass.async_block_till_done()
assert ( hass.states.async_set(entity_id, STATE_ON, {ATTR_PERCENTAGE: 100})
acc.speed_mapping.speed_ranges == HomeKitSpeedMapping(speed_list).speed_ranges
)
acc.speed_mapping.speed_to_homekit = Mock(return_value=42)
acc.speed_mapping.speed_to_states = Mock(return_value="ludicrous")
hass.states.async_set(entity_id, STATE_ON, {ATTR_SPEED: SPEED_HIGH})
await hass.async_block_till_done() await hass.async_block_till_done()
acc.speed_mapping.speed_to_homekit.assert_called_with(SPEED_HIGH) assert acc.char_speed.value == 100
assert acc.char_speed.value == 42
# Set from HomeKit # Set from HomeKit
call_set_speed = async_mock_service(hass, DOMAIN, "set_speed") call_set_percentage = async_mock_service(hass, DOMAIN, "set_percentage")
char_speed_iid = acc.char_speed.to_HAP()[HAP_REPR_IID] char_speed_iid = acc.char_speed.to_HAP()[HAP_REPR_IID]
char_active_iid = acc.char_active.to_HAP()[HAP_REPR_IID] char_active_iid = acc.char_active.to_HAP()[HAP_REPR_IID]
@ -320,18 +304,17 @@ async def test_fan_speed(hass, hk_driver, cls, events):
) )
await hass.async_add_executor_job(acc.char_speed.client_update_value, 42) await hass.async_add_executor_job(acc.char_speed.client_update_value, 42)
await hass.async_block_till_done() await hass.async_block_till_done()
acc.speed_mapping.speed_to_states.assert_called_with(42)
assert acc.char_speed.value == 42 assert acc.char_speed.value == 42
assert acc.char_active.value == 1 assert acc.char_active.value == 1
assert call_set_speed[0] assert call_set_percentage[0]
assert call_set_speed[0].data[ATTR_ENTITY_ID] == entity_id assert call_set_percentage[0].data[ATTR_ENTITY_ID] == entity_id
assert call_set_speed[0].data[ATTR_SPEED] == "ludicrous" assert call_set_percentage[0].data[ATTR_PERCENTAGE] == 42
assert len(events) == 1 assert len(events) == 1
assert events[-1].data[ATTR_VALUE] == "ludicrous" assert events[-1].data[ATTR_VALUE] == 42
# Verify speed is preserved from off to on # Verify speed is preserved from off to on
hass.states.async_set(entity_id, STATE_OFF, {ATTR_SPEED: SPEED_OFF}) hass.states.async_set(entity_id, STATE_OFF, {ATTR_PERCENTAGE: 42})
await hass.async_block_till_done() await hass.async_block_till_done()
assert acc.char_speed.value == 42 assert acc.char_speed.value == 42
assert acc.char_active.value == 0 assert acc.char_active.value == 0
@ -356,7 +339,6 @@ async def test_fan_speed(hass, hk_driver, cls, events):
async def test_fan_set_all_one_shot(hass, hk_driver, cls, events): async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
"""Test fan with speed.""" """Test fan with speed."""
entity_id = "fan.demo" entity_id = "fan.demo"
speed_list = [SPEED_OFF, SPEED_LOW, SPEED_HIGH]
hass.states.async_set( hass.states.async_set(
entity_id, entity_id,
@ -365,10 +347,9 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED
| SUPPORT_OSCILLATE | SUPPORT_OSCILLATE
| SUPPORT_DIRECTION, | SUPPORT_DIRECTION,
ATTR_SPEED: SPEED_OFF, ATTR_PERCENTAGE: 0,
ATTR_OSCILLATING: False, ATTR_OSCILLATING: False,
ATTR_DIRECTION: DIRECTION_FORWARD, ATTR_DIRECTION: DIRECTION_FORWARD,
ATTR_SPEED_LIST: speed_list,
}, },
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -381,13 +362,6 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
await acc.run_handler() await acc.run_handler()
await hass.async_block_till_done() await hass.async_block_till_done()
assert (
acc.speed_mapping.speed_ranges == HomeKitSpeedMapping(speed_list).speed_ranges
)
acc.speed_mapping.speed_to_homekit = Mock(return_value=42)
acc.speed_mapping.speed_to_states = Mock(return_value="ludicrous")
hass.states.async_set( hass.states.async_set(
entity_id, entity_id,
STATE_OFF, STATE_OFF,
@ -395,17 +369,16 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED
| SUPPORT_OSCILLATE | SUPPORT_OSCILLATE
| SUPPORT_DIRECTION, | SUPPORT_DIRECTION,
ATTR_SPEED: SPEED_OFF, ATTR_PERCENTAGE: 0,
ATTR_OSCILLATING: False, ATTR_OSCILLATING: False,
ATTR_DIRECTION: DIRECTION_FORWARD, ATTR_DIRECTION: DIRECTION_FORWARD,
ATTR_SPEED_LIST: speed_list,
}, },
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF assert hass.states.get(entity_id).state == STATE_OFF
# Set from HomeKit # Set from HomeKit
call_set_speed = async_mock_service(hass, DOMAIN, "set_speed") call_set_percentage = async_mock_service(hass, DOMAIN, "set_percentage")
call_oscillate = async_mock_service(hass, DOMAIN, "oscillate") call_oscillate = async_mock_service(hass, DOMAIN, "oscillate")
call_set_direction = async_mock_service(hass, DOMAIN, "set_direction") call_set_direction = async_mock_service(hass, DOMAIN, "set_direction")
call_turn_on = async_mock_service(hass, DOMAIN, "turn_on") call_turn_on = async_mock_service(hass, DOMAIN, "turn_on")
@ -444,11 +417,10 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
"mock_addr", "mock_addr",
) )
await hass.async_block_till_done() await hass.async_block_till_done()
acc.speed_mapping.speed_to_states.assert_called_with(42)
assert not call_turn_on assert not call_turn_on
assert call_set_speed[0] assert call_set_percentage[0]
assert call_set_speed[0].data[ATTR_ENTITY_ID] == entity_id assert call_set_percentage[0].data[ATTR_ENTITY_ID] == entity_id
assert call_set_speed[0].data[ATTR_SPEED] == "ludicrous" assert call_set_percentage[0].data[ATTR_PERCENTAGE] == 42
assert call_oscillate[0] assert call_oscillate[0]
assert call_oscillate[0].data[ATTR_ENTITY_ID] == entity_id assert call_oscillate[0].data[ATTR_ENTITY_ID] == entity_id
assert call_oscillate[0].data[ATTR_OSCILLATING] is True assert call_oscillate[0].data[ATTR_OSCILLATING] is True
@ -459,7 +431,7 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
assert events[0].data[ATTR_VALUE] is True assert events[0].data[ATTR_VALUE] is True
assert events[1].data[ATTR_VALUE] == DIRECTION_REVERSE assert events[1].data[ATTR_VALUE] == DIRECTION_REVERSE
assert events[2].data[ATTR_VALUE] == "ludicrous" assert events[2].data[ATTR_VALUE] == 42
hass.states.async_set( hass.states.async_set(
entity_id, entity_id,
@ -468,10 +440,9 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED ATTR_SUPPORTED_FEATURES: SUPPORT_SET_SPEED
| SUPPORT_OSCILLATE | SUPPORT_OSCILLATE
| SUPPORT_DIRECTION, | SUPPORT_DIRECTION,
ATTR_SPEED: SPEED_OFF, ATTR_PERCENTAGE: 0,
ATTR_OSCILLATING: False, ATTR_OSCILLATING: False,
ATTR_DIRECTION: DIRECTION_FORWARD, ATTR_DIRECTION: DIRECTION_FORWARD,
ATTR_SPEED_LIST: speed_list,
}, },
) )
await hass.async_block_till_done() await hass.async_block_till_done()
@ -506,11 +477,10 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
# Turn on should not be called if its already on # Turn on should not be called if its already on
# and we set a fan speed # and we set a fan speed
await hass.async_block_till_done() await hass.async_block_till_done()
acc.speed_mapping.speed_to_states.assert_called_with(42)
assert len(events) == 6 assert len(events) == 6
assert call_set_speed[1] assert call_set_percentage[1]
assert call_set_speed[1].data[ATTR_ENTITY_ID] == entity_id assert call_set_percentage[1].data[ATTR_ENTITY_ID] == entity_id
assert call_set_speed[1].data[ATTR_SPEED] == "ludicrous" assert call_set_percentage[1].data[ATTR_PERCENTAGE] == 42
assert call_oscillate[1] assert call_oscillate[1]
assert call_oscillate[1].data[ATTR_ENTITY_ID] == entity_id assert call_oscillate[1].data[ATTR_ENTITY_ID] == entity_id
assert call_oscillate[1].data[ATTR_OSCILLATING] is True assert call_oscillate[1].data[ATTR_OSCILLATING] is True
@ -520,7 +490,7 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
assert events[-3].data[ATTR_VALUE] is True assert events[-3].data[ATTR_VALUE] is True
assert events[-2].data[ATTR_VALUE] == DIRECTION_REVERSE assert events[-2].data[ATTR_VALUE] == DIRECTION_REVERSE
assert events[-1].data[ATTR_VALUE] == "ludicrous" assert events[-1].data[ATTR_VALUE] == 42
hk_driver.set_characteristics( hk_driver.set_characteristics(
{ {
@ -554,7 +524,7 @@ async def test_fan_set_all_one_shot(hass, hk_driver, cls, events):
assert len(events) == 7 assert len(events) == 7
assert call_turn_off assert call_turn_off
assert call_turn_off[0].data[ATTR_ENTITY_ID] == entity_id assert call_turn_off[0].data[ATTR_ENTITY_ID] == entity_id
assert len(call_set_speed) == 2 assert len(call_set_percentage) == 2
assert len(call_oscillate) == 2 assert len(call_oscillate) == 2
assert len(call_set_direction) == 2 assert len(call_set_direction) == 2

View File

@ -21,8 +21,6 @@ from homeassistant.components.homekit.const import (
TYPE_VALVE, TYPE_VALVE,
) )
from homeassistant.components.homekit.util import ( from homeassistant.components.homekit.util import (
HomeKitSpeedMapping,
SpeedRange,
cleanup_name_for_homekit, cleanup_name_for_homekit,
convert_to_float, convert_to_float,
density_to_air_quality, density_to_air_quality,
@ -251,63 +249,6 @@ async def test_dismiss_setup_msg(hass):
assert call_dismiss_notification[0].data[ATTR_NOTIFICATION_ID] == "entry_id" assert call_dismiss_notification[0].data[ATTR_NOTIFICATION_ID] == "entry_id"
def test_homekit_speed_mapping():
"""Test if the SpeedRanges from a speed_list are as expected."""
# A standard 2-speed fan
speed_mapping = HomeKitSpeedMapping(["off", "low", "high"])
assert speed_mapping.speed_ranges == {
"off": SpeedRange(0, 0),
"low": SpeedRange(100 / 3, 50),
"high": SpeedRange(200 / 3, 100),
}
# A standard 3-speed fan
speed_mapping = HomeKitSpeedMapping(["off", "low", "medium", "high"])
assert speed_mapping.speed_ranges == {
"off": SpeedRange(0, 0),
"low": SpeedRange(100 / 4, 100 / 3),
"medium": SpeedRange(200 / 4, 200 / 3),
"high": SpeedRange(300 / 4, 100),
}
# a Dyson-like fan with 10 speeds
speed_mapping = HomeKitSpeedMapping([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
assert speed_mapping.speed_ranges == {
0: SpeedRange(0, 0),
1: SpeedRange(10, 100 / 9),
2: SpeedRange(20, 200 / 9),
3: SpeedRange(30, 300 / 9),
4: SpeedRange(40, 400 / 9),
5: SpeedRange(50, 500 / 9),
6: SpeedRange(60, 600 / 9),
7: SpeedRange(70, 700 / 9),
8: SpeedRange(80, 800 / 9),
9: SpeedRange(90, 100),
}
def test_speed_to_homekit():
"""Test speed conversion from HA to Homekit."""
speed_mapping = HomeKitSpeedMapping(["off", "low", "high"])
assert speed_mapping.speed_to_homekit(None) is None
assert speed_mapping.speed_to_homekit("off") == 0
assert speed_mapping.speed_to_homekit("low") == 50
assert speed_mapping.speed_to_homekit("high") == 100
def test_speed_to_states():
"""Test speed conversion from Homekit to HA."""
speed_mapping = HomeKitSpeedMapping(["off", "low", "high"])
assert speed_mapping.speed_to_states(-1) == "off"
assert speed_mapping.speed_to_states(0) == "off"
assert speed_mapping.speed_to_states(33) == "off"
assert speed_mapping.speed_to_states(34) == "low"
assert speed_mapping.speed_to_states(50) == "low"
assert speed_mapping.speed_to_states(66) == "low"
assert speed_mapping.speed_to_states(67) == "high"
assert speed_mapping.speed_to_states(100) == "high"
async def test_port_is_available(hass): async def test_port_is_available(hass):
"""Test we can get an available port and it is actually available.""" """Test we can get an available port and it is actually available."""
next_port = await hass.async_add_executor_job( next_port = await hass.async_add_executor_job(