From f26861976dee15ea0563914508adfe8f4ae0d6a8 Mon Sep 17 00:00:00 2001 From: Yannick POLLART Date: Thu, 22 Jun 2017 07:48:45 +0200 Subject: [PATCH] Rfxtrx binary sensor (#6794) * Added rfxtrx binary sensors to a new branch * binary_sensor/rfxtrx: added support for automatic_add * Fixed pylint warnings * off_delay is set wit clearer time specifiers (cv.time_period) * fire_event config attribute is now supported by rfxtrx binary sensors. * Cosmetic ordering * Fix lint errors for PR requirements. * Fixed indents, line length and comment problems. * Yet another line too long fix... * Using existing attributes and config constants. * Cosmetic fix (ATTR_DATABITS -> ATTR_DATA_BITS) * Removed unused attribute * FIX masked device id logging message * FIX line too long * FIX trailing white space * FIX: rfxtrx binary_sensor manages its own devices only. * Added a basic config helper for pt2262 devices * Make pylint happy * Fixed most houndci-bot-detected issues * Fix TOX complaint about blank line after function docstring * Fix data bit value calculation * Fixed line too long * Removed unnecessary code. * remove trailing whitespace * Added hass property to device object. --- .../components/binary_sensor/rfxtrx.py | 234 ++++++++++++++++++ homeassistant/components/rfxtrx.py | 104 +++++++- 2 files changed, 336 insertions(+), 2 deletions(-) create mode 100644 homeassistant/components/binary_sensor/rfxtrx.py diff --git a/homeassistant/components/binary_sensor/rfxtrx.py b/homeassistant/components/binary_sensor/rfxtrx.py new file mode 100644 index 00000000000..793f294bb55 --- /dev/null +++ b/homeassistant/components/binary_sensor/rfxtrx.py @@ -0,0 +1,234 @@ +""" +Support for RFXtrx binary sensors. + +Lighting4 devices (sensors based on PT2262 encoder) are supported and +tested. Other types may need some work. + +""" + +import logging +import voluptuous as vol +from homeassistant.components import rfxtrx +from homeassistant.util import slugify +from homeassistant.util import dt as dt_util +from homeassistant.helpers import config_validation as cv +from homeassistant.helpers import event as evt +from homeassistant.components.binary_sensor import BinarySensorDevice +from homeassistant.components.rfxtrx import ( + ATTR_AUTOMATIC_ADD, ATTR_NAME, ATTR_OFF_DELAY, ATTR_FIREEVENT, + ATTR_DATA_BITS, CONF_DEVICES +) +from homeassistant.const import ( + CONF_SENSOR_CLASS, CONF_COMMAND_ON, CONF_COMMAND_OFF +) + +DEPENDENCIES = ["rfxtrx"] + +_LOGGER = logging.getLogger(__name__) + +PLATFORM_SCHEMA = vol.Schema({ + vol.Required("platform"): rfxtrx.DOMAIN, + vol.Optional(CONF_DEVICES, default={}): vol.All( + dict, rfxtrx.valid_binary_sensor), + vol.Optional(ATTR_AUTOMATIC_ADD, default=False): cv.boolean, +}, extra=vol.ALLOW_EXTRA) + + +def setup_platform(hass, config, add_devices_callback, discovery_info=None): + """Setup the Binary Sensor platform to rfxtrx.""" + import RFXtrx as rfxtrxmod + sensors = [] + + for packet_id, entity in config['devices'].items(): + event = rfxtrx.get_rfx_object(packet_id) + device_id = slugify(event.device.id_string.lower()) + + if device_id in rfxtrx.RFX_DEVICES: + continue + + if entity[ATTR_DATA_BITS] is not None: + _LOGGER.info("Masked device id: %s", + rfxtrx.get_pt2262_deviceid(device_id, + entity[ATTR_DATA_BITS])) + + _LOGGER.info("Add %s rfxtrx.binary_sensor (class %s)", + entity[ATTR_NAME], entity[CONF_SENSOR_CLASS]) + + device = RfxtrxBinarySensor(event, entity[ATTR_NAME], + entity[CONF_SENSOR_CLASS], + entity[ATTR_FIREEVENT], + entity[ATTR_OFF_DELAY], + entity[ATTR_DATA_BITS], + entity[CONF_COMMAND_ON], + entity[CONF_COMMAND_OFF]) + device.hass = hass + sensors.append(device) + rfxtrx.RFX_DEVICES[device_id] = device + + add_devices_callback(sensors) + + # pylint: disable=too-many-branches + def binary_sensor_update(event): + """Callback for control updates from the RFXtrx gateway.""" + if not isinstance(event, rfxtrxmod.ControlEvent): + return + + device_id = slugify(event.device.id_string.lower()) + + if device_id in rfxtrx.RFX_DEVICES: + sensor = rfxtrx.RFX_DEVICES[device_id] + else: + sensor = rfxtrx.get_pt2262_device(device_id) + + if sensor is None: + # Add the entity if not exists and automatic_add is True + if not config[ATTR_AUTOMATIC_ADD]: + return + + poss_dev = rfxtrx.find_possible_pt2262_device(device_id) + + if poss_dev is not None: + poss_id = slugify(poss_dev.event.device.id_string.lower()) + _LOGGER.info("Found possible matching deviceid %s.", + poss_id) + + pkt_id = "".join("{0:02x}".format(x) for x in event.data) + sensor = RfxtrxBinarySensor(event, pkt_id) + rfxtrx.RFX_DEVICES[device_id] = sensor + add_devices_callback([sensor]) + _LOGGER.info("Added binary sensor %s " + "(Device_id: %s Class: %s Sub: %s)", + pkt_id, + slugify(event.device.id_string.lower()), + event.device.__class__.__name__, + event.device.subtype) + + elif not isinstance(sensor, RfxtrxBinarySensor): + return + else: + _LOGGER.info("Binary sensor update " + "(Device_id: %s Class: %s Sub: %s)", + slugify(event.device.id_string.lower()), + event.device.__class__.__name__, + event.device.subtype) + + if sensor.is_pt2262: + cmd = rfxtrx.get_pt2262_cmd(device_id, sensor.data_bits) + _LOGGER.info("applying cmd %s to device_id: %s)", + cmd, sensor.masked_id) + sensor.apply_cmd(int(cmd, 16)) + else: + if not sensor.is_on or sensor.should_fire_event: + sensor.update_state(True) + + if (sensor.is_on and sensor.off_delay is not None and + sensor.delay_listener is None): + + def off_delay_listener(now): + """Switch device off after a delay.""" + sensor.delay_listener = None + sensor.update_state(False) + + sensor.delay_listener = evt.track_point_in_time( + hass, off_delay_listener, dt_util.utcnow() + sensor.off_delay + ) + + # Subscribe to main rfxtrx events + if binary_sensor_update not in rfxtrx.RECEIVED_EVT_SUBSCRIBERS: + rfxtrx.RECEIVED_EVT_SUBSCRIBERS.append(binary_sensor_update) + + +# pylint: disable=too-many-instance-attributes,too-many-arguments +class RfxtrxBinarySensor(BinarySensorDevice): + """An Rfxtrx binary sensor.""" + + def __init__(self, event, name, sensor_class=None, + should_fire=False, off_delay=None, data_bits=None, + cmd_on=None, cmd_off=None): + """Initialize the sensor.""" + self.event = event + self._name = name + self._should_fire_event = should_fire + self._sensor_class = sensor_class + self._off_delay = off_delay + self._state = False + self.delay_listener = None + self._data_bits = data_bits + self._cmd_on = cmd_on + self._cmd_off = cmd_off + + if data_bits is not None: + self._masked_id = rfxtrx.get_pt2262_deviceid( + event.device.id_string.lower(), + data_bits) + + def __str__(self): + """Return the name of the sensor.""" + return self._name + + @property + def name(self): + """Return the device name.""" + return self._name + + @property + def is_pt2262(self): + """Return true if the device is PT2262-based.""" + return self._data_bits is not None + + @property + def masked_id(self): + """Return the masked device id (isolated address bits).""" + return self._masked_id + + @property + def data_bits(self): + """Return the number of data bits.""" + return self._data_bits + + @property + def cmd_on(self): + """Return the value of the 'On' command.""" + return self._cmd_on + + @property + def cmd_off(self): + """Return the value of the 'Off' command.""" + return self._cmd_off + + @property + def should_poll(self): + """No polling needed.""" + return False + + @property + def should_fire_event(self): + """Return is the device must fire event.""" + return self._should_fire_event + + @property + def sensor_class(self): + """Return the sensor class.""" + return self._sensor_class + + @property + def off_delay(self): + """Return the off_delay attribute value.""" + return self._off_delay + + @property + def is_on(self): + """Return true if the sensor state is True.""" + return self._state + + def apply_cmd(self, cmd): + """Apply a command for updating the state.""" + if cmd == self.cmd_on: + self.update_state(True) + elif cmd == self.cmd_off: + self.update_state(False) + + def update_state(self, state): + """Update the state of the device.""" + self._state = state + self.update_ha_state() diff --git a/homeassistant/components/rfxtrx.py b/homeassistant/components/rfxtrx.py index 4e0f89be315..4f92b1d99c9 100644 --- a/homeassistant/components/rfxtrx.py +++ b/homeassistant/components/rfxtrx.py @@ -10,9 +10,12 @@ import voluptuous as vol import homeassistant.helpers.config_validation as cv from homeassistant.util import slugify -from homeassistant.const import EVENT_HOMEASSISTANT_STOP +from homeassistant.const import ( + EVENT_HOMEASSISTANT_STOP, + ATTR_ENTITY_ID, TEMP_CELSIUS, + CONF_SENSOR_CLASS, CONF_COMMAND_ON, CONF_COMMAND_OFF +) from homeassistant.helpers.entity import Entity -from homeassistant.const import (ATTR_ENTITY_ID, TEMP_CELSIUS) REQUIREMENTS = ['pyRFXtrx==0.18.0'] @@ -27,7 +30,9 @@ ATTR_STATE = 'state' ATTR_NAME = 'name' ATTR_FIREEVENT = 'fire_event' ATTR_DATA_TYPE = 'data_type' +ATTR_DATA_BITS = 'data_bits' ATTR_DUMMY = 'dummy' +ATTR_OFF_DELAY = 'off_delay' CONF_SIGNAL_REPETITIONS = 'signal_repetitions' CONF_DEVICES = 'devices' EVENT_BUTTON_PRESSED = 'button_pressed' @@ -78,6 +83,8 @@ def _valid_device(value, device_type): if device_type == 'sensor': config[key] = DEVICE_SCHEMA_SENSOR(device) + elif device_type == 'binary_sensor': + config[key] = DEVICE_SCHEMA_BINARYSENSOR(device) elif device_type == 'light_switch': config[key] = DEVICE_SCHEMA(device) else: @@ -93,6 +100,11 @@ def valid_sensor(value): return _valid_device(value, "sensor") +def valid_binary_sensor(value): + """Validate binary sensor configuration.""" + return _valid_device(value, "binary_sensor") + + def _valid_light_switch(value): return _valid_device(value, "light_switch") @@ -109,6 +121,17 @@ DEVICE_SCHEMA_SENSOR = vol.Schema({ vol.All(cv.ensure_list, [vol.In(DATA_TYPES.keys())]), }) +DEVICE_SCHEMA_BINARYSENSOR = vol.Schema({ + vol.Optional(ATTR_NAME, default=None): cv.string, + vol.Optional(CONF_SENSOR_CLASS, default=None): cv.string, + vol.Optional(ATTR_FIREEVENT, default=False): cv.boolean, + vol.Optional(ATTR_OFF_DELAY, default=None): + vol.Any(cv.time_period, cv.positive_timedelta), + vol.Optional(ATTR_DATA_BITS, default=None): cv.positive_int, + vol.Optional(CONF_COMMAND_ON, default=None): cv.byte, + vol.Optional(CONF_COMMAND_OFF, default=None): cv.byte +}) + DEFAULT_SCHEMA = vol.Schema({ vol.Required("platform"): DOMAIN, vol.Optional(CONF_DEVICES, default={}): vol.All(dict, _valid_light_switch), @@ -192,6 +215,78 @@ def get_rfx_object(packetid): return obj +def get_pt2262_deviceid(device_id, nb_data_bits): + """Extract and return the address bits from a Lighting4/PT2262 packet.""" + import binascii + try: + data = bytearray.fromhex(device_id) + except ValueError: + return None + + mask = 0xFF & ~((1 << nb_data_bits) - 1) + + data[len(data)-1] &= mask + + return binascii.hexlify(data) + + +def get_pt2262_cmd(device_id, data_bits): + """Extract and return the data bits from a Lighting4/PT2262 packet.""" + try: + data = bytearray.fromhex(device_id) + except ValueError: + return None + + mask = 0xFF & ((1 << data_bits) - 1) + + return hex(data[-1] & mask) + + +# pylint: disable=unused-variable +def get_pt2262_device(device_id): + """Look for the device which id matches the given device_id parameter.""" + for dev_id, device in RFX_DEVICES.items(): + try: + if (device.is_pt2262 and + device.masked_id == get_pt2262_deviceid( + device_id, + device.data_bits)): + _LOGGER.info("rfxtrx: found matching device %s for %s", + device_id, + get_pt2262_deviceid(device_id, device.data_bits)) + return device + except AttributeError: + continue + return None + + +# pylint: disable=unused-variable +def find_possible_pt2262_device(device_id): + """Look for the device which id matches the given device_id parameter.""" + for dev_id, device in RFX_DEVICES.items(): + if len(dev_id) == len(device_id): + size = None + for i in range(0, len(dev_id)): + if dev_id[i] != device_id[i]: + break + size = i + + if size is not None: + size = len(dev_id) - size - 1 + _LOGGER.info("rfxtrx: found possible device %s for %s " + "with the following configuration:\n" + "data_bits=%d\n" + "command_on=0x%s\n" + "command_off=0x%s\n", + device_id, + dev_id, + size * 4, + dev_id[-size:], device_id[-size:]) + return device + + return None + + def get_devices_from_config(config, device, hass): """Read rfxtrx configuration.""" signal_repetitions = config[CONF_SIGNAL_REPETITIONS] @@ -319,6 +414,11 @@ class RfxtrxDevice(Entity): """Return is the device must fire event.""" return self._should_fire_event + @property + def is_pt2262(self): + """Return true if the device is PT2262-based.""" + return False + @property def is_on(self): """Return true if device is on."""