Refactor async_turn_on() for ZHA Light. (#21156)

* Refactor async_turn_on() for ZHA Light.

Use "move_to_level_with_on_off" if brightness or transition attributes
are present in the service call data, otherwise issue "On" Zigbee
command.
Allow brightness of 0 for service call -- effectively turning the light
off.
Send color commands only after the light was turned on.

* Fix zha.light tests.
This commit is contained in:
Alexei Chetroi 2019-02-27 08:34:38 -05:00 committed by David F. Mulcahey
parent 27e6c6665f
commit 9066609d23
2 changed files with 92 additions and 49 deletions

View File

@ -20,7 +20,7 @@ _LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['zha'] DEPENDENCIES = ['zha']
DEFAULT_DURATION = 0.5 DEFAULT_DURATION = 5
CAPABILITIES_COLOR_XY = 0x08 CAPABILITIES_COLOR_XY = 0x08
CAPABILITIES_COLOR_TEMP = 0x10 CAPABILITIES_COLOR_TEMP = 0x10
@ -110,8 +110,13 @@ class Light(ZhaEntity, light.Light):
return self.state_attributes return self.state_attributes
def set_level(self, value): def set_level(self, value):
"""Set the brightness of this light between 0..255.""" """Set the brightness of this light between 0..254.
value = max(0, min(255, value))
brightness level 255 is a special value instructing the device to come
on at `on_level` Zigbee attribute value, regardless of the last set
level
"""
value = max(0, min(254, value))
self._brightness = value self._brightness = value
self.async_schedule_update_ha_state() self.async_schedule_update_ha_state()
@ -146,8 +151,31 @@ class Light(ZhaEntity, light.Light):
async def async_turn_on(self, **kwargs): async def async_turn_on(self, **kwargs):
"""Turn the entity on.""" """Turn the entity on."""
duration = kwargs.get(light.ATTR_TRANSITION, DEFAULT_DURATION) transition = kwargs.get(light.ATTR_TRANSITION)
duration = duration * 10 # tenths of s duration = transition * 10 if transition else DEFAULT_DURATION
brightness = kwargs.get(light.ATTR_BRIGHTNESS)
if (brightness is not None or transition) and \
self._supported_features & light.SUPPORT_BRIGHTNESS:
if brightness is not None:
level = min(254, brightness)
else:
level = self._brightness or 254
success = await self._level_channel.move_to_level_with_on_off(
level,
duration
)
if not success:
return
self._state = bool(level)
if level:
self._brightness = level
if brightness is None or brightness:
success = await self._on_off_channel.on()
if not success:
return
self._state = True
if light.ATTR_COLOR_TEMP in kwargs and \ if light.ATTR_COLOR_TEMP in kwargs and \
self.supported_features & light.SUPPORT_COLOR_TEMP: self.supported_features & light.SUPPORT_COLOR_TEMP:
@ -171,32 +199,12 @@ class Light(ZhaEntity, light.Light):
return return
self._hs_color = hs_color self._hs_color = hs_color
if self._brightness is not None:
brightness = kwargs.get(
light.ATTR_BRIGHTNESS, self._brightness or 255)
success = await self._level_channel.move_to_level_with_on_off(
brightness,
duration
)
if not success:
return
self._state = True
self._brightness = brightness
self.async_schedule_update_ha_state()
return
success = await self._on_off_channel.on()
if not success:
return
self._state = True
self.async_schedule_update_ha_state() self.async_schedule_update_ha_state()
async def async_turn_off(self, **kwargs): async def async_turn_off(self, **kwargs):
"""Turn the entity off.""" """Turn the entity off."""
duration = kwargs.get(light.ATTR_TRANSITION) duration = kwargs.get(light.ATTR_TRANSITION)
supports_level = self.supported_features & light.SUPPORT_BRIGHTNESS supports_level = self.supported_features & light.SUPPORT_BRIGHTNESS
success = None
if duration and supports_level: if duration and supports_level:
success = await self._level_channel.move_to_level_with_on_off( success = await self._level_channel.move_to_level_with_on_off(
0, 0,

View File

@ -1,20 +1,24 @@
"""Test zha light.""" """Test zha light."""
from unittest.mock import call, patch import asyncio
from unittest.mock import MagicMock, call, patch, sentinel
from homeassistant.components.light import DOMAIN from homeassistant.components.light import DOMAIN
from homeassistant.const import STATE_ON, STATE_OFF, STATE_UNAVAILABLE from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from tests.common import mock_coro
from .common import ( from .common import (
async_init_zigpy_device, make_attribute, make_entity_id, async_enable_traffic, async_init_zigpy_device, async_test_device_join,
async_test_device_join, async_enable_traffic make_attribute, make_entity_id)
)
from tests.common import mock_coro
ON = 1 ON = 1
OFF = 0 OFF = 0
async def test_light(hass, config_entry, zha_gateway): async def test_light(hass, config_entry, zha_gateway, monkeypatch):
"""Test zha light platform.""" """Test zha light platform."""
from zigpy.zcl.clusters.general import OnOff, LevelControl, Basic from zigpy.zcl.clusters.general import OnOff, LevelControl, Basic
from zigpy.zcl.foundation import Status
from zigpy.profiles.zha import DeviceType from zigpy.profiles.zha import DeviceType
# create zigpy devices # create zigpy devices
@ -52,6 +56,12 @@ async def test_light(hass, config_entry, zha_gateway):
# dimmable light # dimmable light
level_device_on_off_cluster = zigpy_device_level.endpoints.get(1).on_off level_device_on_off_cluster = zigpy_device_level.endpoints.get(1).on_off
level_device_level_cluster = zigpy_device_level.endpoints.get(1).level level_device_level_cluster = zigpy_device_level.endpoints.get(1).level
on_off_mock = MagicMock(side_effect=asyncio.coroutine(MagicMock(
return_value=(sentinel.data, Status.SUCCESS))))
level_mock = MagicMock(side_effect=asyncio.coroutine(MagicMock(
return_value=(sentinel.data, Status.SUCCESS))))
monkeypatch.setattr(level_device_on_off_cluster, 'request', on_off_mock)
monkeypatch.setattr(level_device_level_cluster, 'request', level_mock)
level_entity_id = make_entity_id(DOMAIN, zigpy_device_level, level_entity_id = make_entity_id(DOMAIN, zigpy_device_level,
level_device_on_off_cluster, level_device_on_off_cluster,
use_suffix=False) use_suffix=False)
@ -81,7 +91,8 @@ async def test_light(hass, config_entry, zha_gateway):
hass, on_off_device_on_off_cluster, on_off_entity_id) hass, on_off_device_on_off_cluster, on_off_entity_id)
await async_test_level_on_off_from_hass( await async_test_level_on_off_from_hass(
hass, level_device_on_off_cluster, level_entity_id) hass, level_device_on_off_cluster, level_device_level_cluster,
level_entity_id)
# test turning the lights on and off from the light # test turning the lights on and off from the light
await async_test_on_from_light( await async_test_on_from_light(
@ -131,7 +142,7 @@ async def async_test_on_off_from_hass(hass, cluster, entity_id):
await hass.services.async_call(DOMAIN, 'turn_on', { await hass.services.async_call(DOMAIN, 'turn_on', {
'entity_id': entity_id 'entity_id': entity_id
}, blocking=True) }, blocking=True)
assert len(cluster.request.mock_calls) == 1 assert cluster.request.call_count == 1
assert cluster.request.call_args == call( assert cluster.request.call_args == call(
False, ON, (), expect_reply=True, manufacturer=None) False, ON, (), expect_reply=True, manufacturer=None)
@ -148,28 +159,52 @@ async def async_test_off_from_hass(hass, cluster, entity_id):
await hass.services.async_call(DOMAIN, 'turn_off', { await hass.services.async_call(DOMAIN, 'turn_off', {
'entity_id': entity_id 'entity_id': entity_id
}, blocking=True) }, blocking=True)
assert len(cluster.request.mock_calls) == 1 assert cluster.request.call_count == 1
assert cluster.request.call_args == call( assert cluster.request.call_args == call(
False, OFF, (), expect_reply=True, manufacturer=None) False, OFF, (), expect_reply=True, manufacturer=None)
async def async_test_level_on_off_from_hass(hass, cluster, entity_id): async def async_test_level_on_off_from_hass(hass, on_off_cluster,
level_cluster, entity_id):
"""Test on off functionality from hass.""" """Test on off functionality from hass."""
from zigpy import types from zigpy import types
from zigpy.zcl.foundation import Status # turn on via UI
with patch( await hass.services.async_call(DOMAIN, 'turn_on', {'entity_id': entity_id},
'zigpy.zcl.Cluster.request', blocking=True)
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])): assert on_off_cluster.request.call_count == 1
# turn on via UI assert level_cluster.request.call_count == 0
await hass.services.async_call(DOMAIN, 'turn_on', { assert on_off_cluster.request.call_args == call(
'entity_id': entity_id False, 1, (), expect_reply=True, manufacturer=None)
}, blocking=True) on_off_cluster.request.reset_mock()
assert len(cluster.request.mock_calls) == 1 level_cluster.request.reset_mock()
assert cluster.request.call_args == call(
False, 4, (types.uint8_t, types.uint16_t), 255, 5.0,
expect_reply=True, manufacturer=None)
await async_test_off_from_hass(hass, cluster, entity_id) await hass.services.async_call(DOMAIN, 'turn_on',
{'entity_id': entity_id, 'transition': 10},
blocking=True)
assert on_off_cluster.request.call_count == 1
assert level_cluster.request.call_count == 1
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None)
assert level_cluster.request.call_args == call(
False, 4, (types.uint8_t, types.uint16_t), 254, 100.0,
expect_reply=True, manufacturer=None)
on_off_cluster.request.reset_mock()
level_cluster.request.reset_mock()
await hass.services.async_call(DOMAIN, 'turn_on',
{'entity_id': entity_id, 'brightness': 10},
blocking=True)
assert on_off_cluster.request.call_count == 1
assert level_cluster.request.call_count == 1
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None)
assert level_cluster.request.call_args == call(
False, 4, (types.uint8_t, types.uint16_t), 10, 5.0,
expect_reply=True, manufacturer=None)
on_off_cluster.request.reset_mock()
level_cluster.request.reset_mock()
await async_test_off_from_hass(hass, on_off_cluster, entity_id)
async def async_test_dimmer_from_light(hass, cluster, entity_id, async def async_test_dimmer_from_light(hass, cluster, entity_id,