add knx cover support (#7997)

* add knx cover

also corrected bugs in device config

1. overwriting of addresses in KNXMultiAddressDevice
2. setting and getting int values
3. added percentage scaling

* Update __init__.py
This commit is contained in:
Will W 2017-06-19 14:30:39 +09:00 committed by Paulus Schoutsen
parent 756768e745
commit 4fdde4f0e2
4 changed files with 190 additions and 5 deletions

View File

@ -227,6 +227,7 @@ omit =
homeassistant/components/climate/sensibo.py
homeassistant/components/cover/garadget.py
homeassistant/components/cover/homematic.py
homeassistant/components/cover/knx.py
homeassistant/components/cover/myq.py
homeassistant/components/cover/opengarage.py
homeassistant/components/cover/rpi_gpio.py

View File

@ -40,6 +40,8 @@ DEVICE_CLASSES = [
'garage', # Garage door control
]
DEVICE_CLASSES_SCHEMA = vol.All(vol.Lower, vol.In(DEVICE_CLASSES))
SUPPORT_OPEN = 1
SUPPORT_CLOSE = 2
SUPPORT_SET_POSITION = 4

View File

@ -0,0 +1,137 @@
"""
Support for KNX covers.
Tested with an MDT roller shutter
http://www.mdt.de/download/MDT_THB_Jalousieaktor_02.pdf
Example configuration:
cover:
- platform: knx
updown_address: 9/0/0
stop_address: 9/0/1
setposition_address: 9/0/3
getposition_address: 9/0/4
"""
import logging
import voluptuous as vol
from homeassistant.components.cover import (
CoverDevice, PLATFORM_SCHEMA, ATTR_POSITION, DEVICE_CLASSES_SCHEMA
)
from homeassistant.components.knx import (KNXConfig, KNXMultiAddressDevice)
from homeassistant.const import (CONF_NAME, CONF_DEVICE_CLASS)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
CONF_UPDOWN = 'updown_address'
CONF_STOP = 'stop_address'
CONF_SETPOSITION_ADDRESS = 'setposition_address'
CONF_GETPOSITION_ADDRESS = 'getposition_address'
DEFAULT_NAME = 'KNX Cover'
DEPENDENCIES = ['knx']
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_UPDOWN): cv.string,
vol.Required(CONF_STOP): cv.string,
vol.Optional(CONF_SETPOSITION_ADDRESS): cv.string,
vol.Optional(CONF_GETPOSITION_ADDRESS): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Create and add an entity based on the configuration."""
add_devices([KNXCover(hass, KNXConfig(config))])
class KNXCover(KNXMultiAddressDevice, CoverDevice):
"""Representation of a KNX cover. e.g. a rollershutter."""
def __init__(self, hass, config):
"""Initialize the cover."""
KNXMultiAddressDevice.__init__(
self, hass, config,
['updown', 'stop'], # required
optional=['setposition', 'getposition']
)
self._device_class = config.config.get(CONF_DEVICE_CLASS)
self._hass = hass
self._current_pos = None
self._target_pos = None
@property
def should_poll(self):
"""Polling is needed for the KNX cover."""
return True
@property
def is_closed(self):
"""Return if the cover is closed."""
if self.current_cover_position is not None:
if self.current_cover_position > 0:
return False
else:
return True
@property
def current_cover_position(self):
"""Return current position of cover.
None is unknown, 0 is closed, 100 is fully open.
"""
return self._current_pos
@property
def target_position(self):
"""Return the position we are trying to reach: 0 - 100."""
return self._target_pos
def set_cover_position(self, **kwargs):
"""Set new target position."""
position = kwargs.get(ATTR_POSITION)
if position is None:
return
self._target_pos = position
self.set_percentage('setposition', position)
_LOGGER.debug(
"%s: Set target position to %d",
self.name, position
)
def update(self):
"""Update device state."""
super().update()
value = self.get_percentage('getposition')
if value is not None:
self._current_pos = value
_LOGGER.debug(
"%s: position = %d",
self.name, value
)
def open_cover(self, **kwargs):
"""Open the cover."""
_LOGGER.debug("%s: open: updown = 0", self.name)
self.set_int_value('updown', 0)
def close_cover(self, **kwargs):
"""Close the cover."""
_LOGGER.debug("%s: open: updown = 1", self.name)
self.set_int_value('updown', 1)
def stop_cover(self, **kwargs):
"""Stop the cover movement."""
_LOGGER.debug("%s: stop: stop = 1", self.name)
self.set_int_value('stop', 1)
@property
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return self._device_class

View File

@ -213,9 +213,6 @@ class KNXMultiAddressDevice(Entity):
to be controlled by multiple group addresses.
"""
names = {}
values = {}
def __init__(self, hass, config, required, optional=None):
"""Initialize the device.
@ -226,28 +223,34 @@ class KNXMultiAddressDevice(Entity):
"""
from knxip.core import parse_group_address, KNXException
self.names = {}
self.values = {}
self._config = config
self._state = False
self._data = None
_LOGGER.debug("Initalizing KNX multi address device")
settings = self._config.config
# parse required addresses
for name in required:
_LOGGER.info(name)
paramname = '{}{}'.format(name, '_address')
addr = self._config.config.get(paramname)
addr = settings.get(paramname)
if addr is None:
_LOGGER.exception(
"Required KNX group address %s missing", paramname)
raise KNXException(
"Group address for %s missing in configuration", paramname)
_LOGGER.debug("%s: %s=%s", settings.get('name'), paramname, addr)
addr = parse_group_address(addr)
self.names[addr] = name
# parse optional addresses
for name in optional:
paramname = '{}{}'.format(name, '_address')
addr = self._config.config.get(paramname)
addr = settings.get(paramname)
_LOGGER.debug("%s: %s=%s", settings.get('name'), paramname, addr)
if addr:
try:
addr = parse_group_address(addr)
@ -285,6 +288,48 @@ class KNXMultiAddressDevice(Entity):
return True
return False
def set_percentage(self, name, percentage):
"""Set a percentage in knx for a given attribute.
DPT_Scaling / DPT 5.001 is a single byte scaled percentage
"""
percentage = abs(percentage) # only accept positive values
scaled_value = percentage * 255 / 100
value = min(255, scaled_value)
self.set_int_value(name, value)
def get_percentage(self, name):
"""Get a percentage from knx for a given attribute.
DPT_Scaling / DPT 5.001 is a single byte scaled percentage
"""
value = self.get_int_value(name)
percentage = round(value * 100 / 255)
return percentage
def set_int_value(self, name, value, num_bytes=1):
"""Set an integer value for a given attribute."""
# KNX packets are big endian
value = round(value) # only accept integers
b_value = value.to_bytes(num_bytes, byteorder='big')
self.set_value(name, list(b_value))
def get_int_value(self, name):
"""Get an integer value for a given attribute."""
# KNX packets are big endian
summed_value = 0
raw_value = self.value(name)
try:
# convert raw value in bytes
for val in raw_value:
summed_value *= 256
summed_value += val
except TypeError:
# pknx returns a non-iterable type for unsuccessful reads
pass
return summed_value
def value(self, name):
"""Return the value to a given named attribute."""
from knxip.core import KNXException