Add modbus light brightness and color temperature (#139703)

Co-authored-by: Abílio Costa <abmantis@users.noreply.github.com>
This commit is contained in:
Dmytro Tkach 2025-05-14 14:07:19 +03:00 committed by GitHub
parent 48520d90ef
commit 4f723232e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 419 additions and 24 deletions

View File

@ -62,8 +62,10 @@ from .const import (
CALL_TYPE_X_COILS,
CALL_TYPE_X_REGISTER_HOLDINGS,
CONF_BAUDRATE,
CONF_BRIGHTNESS_REGISTER,
CONF_BYTESIZE,
CONF_CLIMATES,
CONF_COLOR_TEMP_REGISTER,
CONF_DATA_TYPE,
CONF_DEVICE_ADDRESS,
CONF_FAN_MODE_AUTO,
@ -415,7 +417,14 @@ SWITCH_SCHEMA = BASE_SWITCH_SCHEMA.extend(
}
)
LIGHT_SCHEMA = BASE_SWITCH_SCHEMA.extend({})
LIGHT_SCHEMA = BASE_SWITCH_SCHEMA.extend(
{
vol.Optional(CONF_BRIGHTNESS_REGISTER): cv.positive_int,
vol.Optional(CONF_COLOR_TEMP_REGISTER): cv.positive_int,
vol.Optional(CONF_MIN_TEMP): cv.positive_int,
vol.Optional(CONF_MAX_TEMP): cv.positive_int,
}
)
FAN_SCHEMA = BASE_SWITCH_SCHEMA.extend({})

View File

@ -16,6 +16,8 @@ from homeassistant.const import (
CONF_BAUDRATE = "baudrate"
CONF_BYTESIZE = "bytesize"
CONF_CLIMATES = "climates"
CONF_BRIGHTNESS_REGISTER = "brightness_address"
CONF_COLOR_TEMP_REGISTER = "color_temp_address"
CONF_DATA_TYPE = "data_type"
CONF_DEVICE_ADDRESS = "device_address"
CONF_FANS = "fans"
@ -167,3 +169,11 @@ PLATFORMS = (
(Platform.SENSOR, CONF_SENSORS),
(Platform.SWITCH, CONF_SWITCHES),
)
LIGHT_DEFAULT_MIN_KELVIN = 2000
LIGHT_DEFAULT_MAX_KELVIN = 7000
LIGHT_MIN_BRIGHTNESS = 0
LIGHT_MAX_BRIGHTNESS = 255
LIGHT_MODBUS_SCALE_MIN = 0
LIGHT_MODBUS_SCALE_MAX = 100
LIGHT_MODBUS_INVALID_VALUE = 0xFFFF

View File

@ -2,18 +2,40 @@
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.light import ColorMode, LightEntity
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_TEMP_KELVIN,
ColorMode,
LightEntity,
)
from homeassistant.const import CONF_LIGHTS, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import get_hub
from .const import (
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_WRITE_REGISTER,
CONF_BRIGHTNESS_REGISTER,
CONF_COLOR_TEMP_REGISTER,
CONF_MAX_TEMP,
CONF_MIN_TEMP,
LIGHT_DEFAULT_MAX_KELVIN,
LIGHT_DEFAULT_MIN_KELVIN,
LIGHT_MAX_BRIGHTNESS,
LIGHT_MODBUS_INVALID_VALUE,
LIGHT_MODBUS_SCALE_MAX,
LIGHT_MODBUS_SCALE_MIN,
)
from .entity import BaseSwitch
from .modbus import ModbusHub
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)
async def async_setup_platform(
@ -32,9 +54,176 @@ async def async_setup_platform(
class ModbusLight(BaseSwitch, LightEntity):
"""Class representing a Modbus light."""
_attr_color_mode = ColorMode.ONOFF
_attr_supported_color_modes = {ColorMode.ONOFF}
def __init__(
self, hass: HomeAssistant, hub: ModbusHub, config: dict[str, Any]
) -> None:
"""Initialize the Modbus light entity."""
super().__init__(hass, hub, config)
self._brightness_address: int | None = config.get(CONF_BRIGHTNESS_REGISTER)
self._color_temp_address: int | None = config.get(CONF_COLOR_TEMP_REGISTER)
# Determine color mode dynamically
self._attr_color_mode = self._detect_color_mode(config)
self._attr_supported_color_modes = {self._attr_color_mode}
# Set min/max kelvin values if the mode is COLOR_TEMP
if self._attr_color_mode == ColorMode.COLOR_TEMP:
self._attr_min_color_temp_kelvin = config.get(
CONF_MIN_TEMP, LIGHT_DEFAULT_MIN_KELVIN
)
self._attr_max_color_temp_kelvin = config.get(
CONF_MAX_TEMP, LIGHT_DEFAULT_MAX_KELVIN
)
async def async_added_to_hass(self) -> None:
"""Handle entity which will be added."""
await super().async_added_to_hass()
if (state := await self.async_get_last_state()) is None:
return
if (brightness := state.attributes.get(ATTR_BRIGHTNESS)) is not None:
self._attr_brightness = brightness
if (color_temp := state.attributes.get(ATTR_COLOR_TEMP_KELVIN)) is not None:
self._attr_color_temp_kelvin = color_temp
@staticmethod
def _detect_color_mode(config: dict[str, Any]) -> ColorMode:
"""Determine the appropriate color mode for the light based on configuration."""
if CONF_COLOR_TEMP_REGISTER in config:
return ColorMode.COLOR_TEMP
if CONF_BRIGHTNESS_REGISTER in config:
return ColorMode.BRIGHTNESS
return ColorMode.ONOFF
async def async_turn_on(self, **kwargs: Any) -> None:
"""Set light on."""
"""Turn light on and set brightness if provided."""
brightness = kwargs.get(ATTR_BRIGHTNESS)
if brightness and isinstance(brightness, int):
await self.async_set_brightness(brightness)
color_temp = kwargs.get(ATTR_COLOR_TEMP_KELVIN)
if color_temp and isinstance(color_temp, int):
await self.async_set_color_temp(color_temp)
await self.async_turn(self.command_on)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn light off."""
await self.async_turn(self._command_off)
async def async_set_brightness(self, brightness: int) -> None:
"""Set the brightness of the light."""
if not self._brightness_address:
return
conv_brightness = self._convert_brightness_to_modbus(brightness)
await self._hub.async_pb_call(
unit=self._slave,
address=self._brightness_address,
value=conv_brightness,
use_call=CALL_TYPE_WRITE_REGISTER,
)
if not self._verify_active:
self._attr_brightness = brightness
async def async_set_color_temp(self, color_temp_kelvin: int) -> None:
"""Send Modbus command to set color temperature."""
if not self._color_temp_address:
return
conv_color_temp_kelvin = self._convert_color_temp_to_modbus(color_temp_kelvin)
await self._hub.async_pb_call(
unit=self._slave,
address=self._color_temp_address,
value=conv_color_temp_kelvin,
use_call=CALL_TYPE_WRITE_REGISTER,
)
if not self._verify_active:
self._attr_color_temp_kelvin = color_temp_kelvin
async def _async_update(self) -> None:
"""Update the entity state, including brightness and color temperature."""
await super()._async_update()
if not self._verify_active:
return
if self._brightness_address:
brightness_result = await self._hub.async_pb_call(
unit=self._slave,
value=1,
address=self._brightness_address,
use_call=CALL_TYPE_REGISTER_HOLDING,
)
if (
brightness_result
and brightness_result.registers
and brightness_result.registers[0] != LIGHT_MODBUS_INVALID_VALUE
):
self._attr_brightness = self._convert_modbus_percent_to_brightness(
brightness_result.registers[0]
)
if self._color_temp_address:
color_result = await self._hub.async_pb_call(
unit=self._slave,
value=1,
address=self._color_temp_address,
use_call=CALL_TYPE_REGISTER_HOLDING,
)
if (
color_result
and color_result.registers
and color_result.registers[0] != LIGHT_MODBUS_INVALID_VALUE
):
self._attr_color_temp_kelvin = (
self._convert_modbus_percent_to_temperature(
color_result.registers[0]
)
)
@staticmethod
def _convert_modbus_percent_to_brightness(percent: int) -> int:
"""Convert Modbus scale (0-100) to the brightness (0-255)."""
return round(
percent
/ (LIGHT_MODBUS_SCALE_MAX - LIGHT_MODBUS_SCALE_MIN)
* LIGHT_MAX_BRIGHTNESS
)
def _convert_modbus_percent_to_temperature(self, percent: int) -> int:
"""Convert Modbus scale (0-100) to the color temperature in Kelvin (2000-7000 К)."""
assert isinstance(self._attr_min_color_temp_kelvin, int) and isinstance(
self._attr_max_color_temp_kelvin, int
)
return round(
self._attr_min_color_temp_kelvin
+ (
percent
/ (LIGHT_MODBUS_SCALE_MAX - LIGHT_MODBUS_SCALE_MIN)
* (self._attr_max_color_temp_kelvin - self._attr_min_color_temp_kelvin)
)
)
@staticmethod
def _convert_brightness_to_modbus(brightness: int) -> int:
"""Convert brightness (0-255) to Modbus scale (0-100)."""
return round(
brightness
/ LIGHT_MAX_BRIGHTNESS
* (LIGHT_MODBUS_SCALE_MAX - LIGHT_MODBUS_SCALE_MIN)
)
def _convert_color_temp_to_modbus(self, kelvin: int) -> int:
"""Convert color temperature from Kelvin to the Modbus scale (0-100)."""
assert isinstance(self._attr_min_color_temp_kelvin, int) and isinstance(
self._attr_max_color_temp_kelvin, int
)
return round(
LIGHT_MODBUS_SCALE_MIN
+ (kelvin - self._attr_min_color_temp_kelvin)
* (LIGHT_MODBUS_SCALE_MAX - LIGHT_MODBUS_SCALE_MIN)
/ (self._attr_max_color_temp_kelvin - self._attr_min_color_temp_kelvin)
)

View File

@ -4,12 +4,18 @@ from pymodbus.exceptions import ModbusException
import pytest
from homeassistant.components.homeassistant import SERVICE_UPDATE_ENTITY
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_TEMP_KELVIN,
DOMAIN as LIGHT_DOMAIN,
)
from homeassistant.components.modbus.const import (
CALL_TYPE_COIL,
CALL_TYPE_DISCRETE,
CALL_TYPE_REGISTER_HOLDING,
CALL_TYPE_REGISTER_INPUT,
CONF_BRIGHTNESS_REGISTER,
CONF_COLOR_TEMP_REGISTER,
CONF_DEVICE_ADDRESS,
CONF_INPUT_TYPE,
CONF_STATE_OFF,
@ -217,7 +223,23 @@ async def test_all_light(hass: HomeAssistant, mock_do_cycle, expected) -> None:
@pytest.mark.parametrize(
"mock_test_state",
[(State(ENTITY_ID, STATE_ON),)],
[
(
State(
ENTITY_ID,
STATE_ON,
{
ATTR_BRIGHTNESS: 128,
ATTR_COLOR_TEMP_KELVIN: 4000,
},
),
State(
ENTITY_ID2,
STATE_ON,
{},
),
)
],
indirect=True,
)
@pytest.mark.parametrize(
@ -229,16 +251,35 @@ async def test_all_light(hass: HomeAssistant, mock_do_cycle, expected) -> None:
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_SCAN_INTERVAL: 0,
}
CONF_BRIGHTNESS_REGISTER: 1,
CONF_COLOR_TEMP_REGISTER: 2,
},
{
CONF_NAME: f"{TEST_ENTITY_NAME} 2",
CONF_ADDRESS: 1235,
CONF_SCAN_INTERVAL: 0,
},
]
},
}
],
)
async def test_restore_state_light(
hass: HomeAssistant, mock_test_state, mock_modbus
) -> None:
"""Run test for sensor restore state."""
assert hass.states.get(ENTITY_ID).state == mock_test_state[0].state
"""Test Modbus Light restore state with brightness and color_temp."""
state_1 = hass.states.get(ENTITY_ID)
state_2 = hass.states.get(ENTITY_ID2)
assert state_1.state == STATE_ON
assert state_1.attributes.get(ATTR_BRIGHTNESS) == mock_test_state[0].attributes.get(
ATTR_BRIGHTNESS
)
assert state_1.attributes.get(ATTR_COLOR_TEMP_KELVIN) == mock_test_state[
0
].attributes.get(ATTR_COLOR_TEMP_KELVIN)
assert state_2.state == STATE_ON
@pytest.mark.parametrize(
@ -271,7 +312,6 @@ async def test_light_service_turn(
"""Run test for service turn_on/turn_off."""
assert MODBUS_DOMAIN in hass.config.components
assert hass.states.get(ENTITY_ID).state == STATE_OFF
await hass.services.async_call(
LIGHT_DOMAIN, SERVICE_TURN_ON, service_data={ATTR_ENTITY_ID: ENTITY_ID}
@ -307,21 +347,143 @@ async def test_light_service_turn(
@pytest.mark.parametrize(
"do_config",
("do_config", "service_data", "expected_calls"),
[
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
}
]
},
(
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_SCAN_INTERVAL: 0,
CONF_BRIGHTNESS_REGISTER: 1,
CONF_COLOR_TEMP_REGISTER: 2,
}
]
},
{ATTR_BRIGHTNESS: 128, ATTR_COLOR_TEMP_KELVIN: 2000},
[(1, 50), (2, 0)],
),
(
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_SCAN_INTERVAL: 0,
CONF_BRIGHTNESS_REGISTER: 1,
}
]
},
{ATTR_BRIGHTNESS: 256},
[(1, 100)],
),
(
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_REGISTER_HOLDING,
CONF_SCAN_INTERVAL: 0,
CONF_COLOR_TEMP_REGISTER: 2,
}
]
},
{ATTR_BRIGHTNESS: 128, ATTR_COLOR_TEMP_KELVIN: 3000},
[(2, 20)],
),
],
)
async def test_service_light_update(hass: HomeAssistant, mock_modbus_ha) -> None:
async def test_color_temp_brightness_light(
hass: HomeAssistant,
mock_modbus_ha,
service_data,
expected_calls,
) -> None:
"""Test Modbus Light color temperature and brightness."""
assert hass.states.get(ENTITY_ID).state == STATE_OFF
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_ON,
service_data={ATTR_ENTITY_ID: ENTITY_ID, **service_data},
blocking=True,
)
assert hass.states.get(ENTITY_ID).state == STATE_ON
calls = mock_modbus_ha.write_register.call_args_list
for expected_register, expected_value in expected_calls:
assert any(
call.args[0] == expected_register and call.kwargs["value"] == expected_value
for call in calls
), (
f"Expected register {expected_register} with value {expected_value} not found in calls {calls}"
)
await hass.services.async_call(
LIGHT_DOMAIN,
SERVICE_TURN_OFF,
service_data={ATTR_ENTITY_ID: ENTITY_ID},
blocking=True,
)
assert hass.states.get(ENTITY_ID).state == STATE_OFF
@pytest.mark.parametrize(
(
"do_config",
"input_output_values",
),
[
(
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_BRIGHTNESS_REGISTER: 1,
CONF_COLOR_TEMP_REGISTER: 2,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
},
]
},
[([100, 0], 255, 7000)],
),
(
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_BRIGHTNESS_REGISTER: 1,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
},
]
},
[([100, None], 255, None), ([0, None], 0, None)],
),
(
{
CONF_LIGHTS: [
{
CONF_NAME: TEST_ENTITY_NAME,
CONF_ADDRESS: 1234,
CONF_WRITE_TYPE: CALL_TYPE_COIL,
CONF_VERIFY: {},
},
]
},
[([None, None], None, None)],
),
],
)
async def test_service_light_update(
hass: HomeAssistant,
mock_modbus_ha,
input_output_values,
) -> None:
"""Run test for service homeassistant.update_entity."""
await hass.services.async_call(
HOMEASSISTANT_DOMAIN,
@ -338,6 +500,31 @@ async def test_service_light_update(hass: HomeAssistant, mock_modbus_ha) -> None
blocking=True,
)
assert hass.states.get(ENTITY_ID).state == STATE_ON
for (
register_values,
expected_brightness,
expected_color_temp,
) in input_output_values:
mock_modbus_ha.read_holding_registers.return_value = ReadResult(register_values)
await hass.services.async_call(
HOMEASSISTANT_DOMAIN,
SERVICE_UPDATE_ENTITY,
{
ATTR_ENTITY_ID: ENTITY_ID,
},
blocking=True,
)
assert (
expected_brightness is None
or hass.states.get(ENTITY_ID).attributes.get(ATTR_BRIGHTNESS)
== expected_brightness
)
assert (
expected_color_temp is None
or hass.states.get(ENTITY_ID).attributes.get(ATTR_COLOR_TEMP_KELVIN)
== expected_color_temp
)
assert hass
async def test_no_discovery_info_light(