mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 13:17:32 +00:00
Rfxtrx binary sensor (#6794)
* Added rfxtrx binary sensors to a new branch * binary_sensor/rfxtrx: added support for automatic_add * Fixed pylint warnings * off_delay is set wit clearer time specifiers (cv.time_period) * fire_event config attribute is now supported by rfxtrx binary sensors. * Cosmetic ordering * Fix lint errors for PR requirements. * Fixed indents, line length and comment problems. * Yet another line too long fix... * Using existing attributes and config constants. * Cosmetic fix (ATTR_DATABITS -> ATTR_DATA_BITS) * Removed unused attribute * FIX masked device id logging message * FIX line too long * FIX trailing white space * FIX: rfxtrx binary_sensor manages its own devices only. * Added a basic config helper for pt2262 devices * Make pylint happy * Fixed most houndci-bot-detected issues * Fix TOX complaint about blank line after function docstring * Fix data bit value calculation * Fixed line too long * Removed unnecessary code. * remove trailing whitespace * Added hass property to device object.
This commit is contained in:
parent
6bfeac7f80
commit
f26861976d
234
homeassistant/components/binary_sensor/rfxtrx.py
Normal file
234
homeassistant/components/binary_sensor/rfxtrx.py
Normal file
@ -0,0 +1,234 @@
|
||||
"""
|
||||
Support for RFXtrx binary sensors.
|
||||
|
||||
Lighting4 devices (sensors based on PT2262 encoder) are supported and
|
||||
tested. Other types may need some work.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
import voluptuous as vol
|
||||
from homeassistant.components import rfxtrx
|
||||
from homeassistant.util import slugify
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers import event as evt
|
||||
from homeassistant.components.binary_sensor import BinarySensorDevice
|
||||
from homeassistant.components.rfxtrx import (
|
||||
ATTR_AUTOMATIC_ADD, ATTR_NAME, ATTR_OFF_DELAY, ATTR_FIREEVENT,
|
||||
ATTR_DATA_BITS, CONF_DEVICES
|
||||
)
|
||||
from homeassistant.const import (
|
||||
CONF_SENSOR_CLASS, CONF_COMMAND_ON, CONF_COMMAND_OFF
|
||||
)
|
||||
|
||||
DEPENDENCIES = ["rfxtrx"]
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PLATFORM_SCHEMA = vol.Schema({
|
||||
vol.Required("platform"): rfxtrx.DOMAIN,
|
||||
vol.Optional(CONF_DEVICES, default={}): vol.All(
|
||||
dict, rfxtrx.valid_binary_sensor),
|
||||
vol.Optional(ATTR_AUTOMATIC_ADD, default=False): cv.boolean,
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
|
||||
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
|
||||
"""Setup the Binary Sensor platform to rfxtrx."""
|
||||
import RFXtrx as rfxtrxmod
|
||||
sensors = []
|
||||
|
||||
for packet_id, entity in config['devices'].items():
|
||||
event = rfxtrx.get_rfx_object(packet_id)
|
||||
device_id = slugify(event.device.id_string.lower())
|
||||
|
||||
if device_id in rfxtrx.RFX_DEVICES:
|
||||
continue
|
||||
|
||||
if entity[ATTR_DATA_BITS] is not None:
|
||||
_LOGGER.info("Masked device id: %s",
|
||||
rfxtrx.get_pt2262_deviceid(device_id,
|
||||
entity[ATTR_DATA_BITS]))
|
||||
|
||||
_LOGGER.info("Add %s rfxtrx.binary_sensor (class %s)",
|
||||
entity[ATTR_NAME], entity[CONF_SENSOR_CLASS])
|
||||
|
||||
device = RfxtrxBinarySensor(event, entity[ATTR_NAME],
|
||||
entity[CONF_SENSOR_CLASS],
|
||||
entity[ATTR_FIREEVENT],
|
||||
entity[ATTR_OFF_DELAY],
|
||||
entity[ATTR_DATA_BITS],
|
||||
entity[CONF_COMMAND_ON],
|
||||
entity[CONF_COMMAND_OFF])
|
||||
device.hass = hass
|
||||
sensors.append(device)
|
||||
rfxtrx.RFX_DEVICES[device_id] = device
|
||||
|
||||
add_devices_callback(sensors)
|
||||
|
||||
# pylint: disable=too-many-branches
|
||||
def binary_sensor_update(event):
|
||||
"""Callback for control updates from the RFXtrx gateway."""
|
||||
if not isinstance(event, rfxtrxmod.ControlEvent):
|
||||
return
|
||||
|
||||
device_id = slugify(event.device.id_string.lower())
|
||||
|
||||
if device_id in rfxtrx.RFX_DEVICES:
|
||||
sensor = rfxtrx.RFX_DEVICES[device_id]
|
||||
else:
|
||||
sensor = rfxtrx.get_pt2262_device(device_id)
|
||||
|
||||
if sensor is None:
|
||||
# Add the entity if not exists and automatic_add is True
|
||||
if not config[ATTR_AUTOMATIC_ADD]:
|
||||
return
|
||||
|
||||
poss_dev = rfxtrx.find_possible_pt2262_device(device_id)
|
||||
|
||||
if poss_dev is not None:
|
||||
poss_id = slugify(poss_dev.event.device.id_string.lower())
|
||||
_LOGGER.info("Found possible matching deviceid %s.",
|
||||
poss_id)
|
||||
|
||||
pkt_id = "".join("{0:02x}".format(x) for x in event.data)
|
||||
sensor = RfxtrxBinarySensor(event, pkt_id)
|
||||
rfxtrx.RFX_DEVICES[device_id] = sensor
|
||||
add_devices_callback([sensor])
|
||||
_LOGGER.info("Added binary sensor %s "
|
||||
"(Device_id: %s Class: %s Sub: %s)",
|
||||
pkt_id,
|
||||
slugify(event.device.id_string.lower()),
|
||||
event.device.__class__.__name__,
|
||||
event.device.subtype)
|
||||
|
||||
elif not isinstance(sensor, RfxtrxBinarySensor):
|
||||
return
|
||||
else:
|
||||
_LOGGER.info("Binary sensor update "
|
||||
"(Device_id: %s Class: %s Sub: %s)",
|
||||
slugify(event.device.id_string.lower()),
|
||||
event.device.__class__.__name__,
|
||||
event.device.subtype)
|
||||
|
||||
if sensor.is_pt2262:
|
||||
cmd = rfxtrx.get_pt2262_cmd(device_id, sensor.data_bits)
|
||||
_LOGGER.info("applying cmd %s to device_id: %s)",
|
||||
cmd, sensor.masked_id)
|
||||
sensor.apply_cmd(int(cmd, 16))
|
||||
else:
|
||||
if not sensor.is_on or sensor.should_fire_event:
|
||||
sensor.update_state(True)
|
||||
|
||||
if (sensor.is_on and sensor.off_delay is not None and
|
||||
sensor.delay_listener is None):
|
||||
|
||||
def off_delay_listener(now):
|
||||
"""Switch device off after a delay."""
|
||||
sensor.delay_listener = None
|
||||
sensor.update_state(False)
|
||||
|
||||
sensor.delay_listener = evt.track_point_in_time(
|
||||
hass, off_delay_listener, dt_util.utcnow() + sensor.off_delay
|
||||
)
|
||||
|
||||
# Subscribe to main rfxtrx events
|
||||
if binary_sensor_update not in rfxtrx.RECEIVED_EVT_SUBSCRIBERS:
|
||||
rfxtrx.RECEIVED_EVT_SUBSCRIBERS.append(binary_sensor_update)
|
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes,too-many-arguments
|
||||
class RfxtrxBinarySensor(BinarySensorDevice):
|
||||
"""An Rfxtrx binary sensor."""
|
||||
|
||||
def __init__(self, event, name, sensor_class=None,
|
||||
should_fire=False, off_delay=None, data_bits=None,
|
||||
cmd_on=None, cmd_off=None):
|
||||
"""Initialize the sensor."""
|
||||
self.event = event
|
||||
self._name = name
|
||||
self._should_fire_event = should_fire
|
||||
self._sensor_class = sensor_class
|
||||
self._off_delay = off_delay
|
||||
self._state = False
|
||||
self.delay_listener = None
|
||||
self._data_bits = data_bits
|
||||
self._cmd_on = cmd_on
|
||||
self._cmd_off = cmd_off
|
||||
|
||||
if data_bits is not None:
|
||||
self._masked_id = rfxtrx.get_pt2262_deviceid(
|
||||
event.device.id_string.lower(),
|
||||
data_bits)
|
||||
|
||||
def __str__(self):
|
||||
"""Return the name of the sensor."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the device name."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def is_pt2262(self):
|
||||
"""Return true if the device is PT2262-based."""
|
||||
return self._data_bits is not None
|
||||
|
||||
@property
|
||||
def masked_id(self):
|
||||
"""Return the masked device id (isolated address bits)."""
|
||||
return self._masked_id
|
||||
|
||||
@property
|
||||
def data_bits(self):
|
||||
"""Return the number of data bits."""
|
||||
return self._data_bits
|
||||
|
||||
@property
|
||||
def cmd_on(self):
|
||||
"""Return the value of the 'On' command."""
|
||||
return self._cmd_on
|
||||
|
||||
@property
|
||||
def cmd_off(self):
|
||||
"""Return the value of the 'Off' command."""
|
||||
return self._cmd_off
|
||||
|
||||
@property
|
||||
def should_poll(self):
|
||||
"""No polling needed."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def should_fire_event(self):
|
||||
"""Return is the device must fire event."""
|
||||
return self._should_fire_event
|
||||
|
||||
@property
|
||||
def sensor_class(self):
|
||||
"""Return the sensor class."""
|
||||
return self._sensor_class
|
||||
|
||||
@property
|
||||
def off_delay(self):
|
||||
"""Return the off_delay attribute value."""
|
||||
return self._off_delay
|
||||
|
||||
@property
|
||||
def is_on(self):
|
||||
"""Return true if the sensor state is True."""
|
||||
return self._state
|
||||
|
||||
def apply_cmd(self, cmd):
|
||||
"""Apply a command for updating the state."""
|
||||
if cmd == self.cmd_on:
|
||||
self.update_state(True)
|
||||
elif cmd == self.cmd_off:
|
||||
self.update_state(False)
|
||||
|
||||
def update_state(self, state):
|
||||
"""Update the state of the device."""
|
||||
self._state = state
|
||||
self.update_ha_state()
|
@ -10,9 +10,12 @@ import voluptuous as vol
|
||||
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.util import slugify
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.const import (
|
||||
EVENT_HOMEASSISTANT_STOP,
|
||||
ATTR_ENTITY_ID, TEMP_CELSIUS,
|
||||
CONF_SENSOR_CLASS, CONF_COMMAND_ON, CONF_COMMAND_OFF
|
||||
)
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.const import (ATTR_ENTITY_ID, TEMP_CELSIUS)
|
||||
|
||||
REQUIREMENTS = ['pyRFXtrx==0.18.0']
|
||||
|
||||
@ -27,7 +30,9 @@ ATTR_STATE = 'state'
|
||||
ATTR_NAME = 'name'
|
||||
ATTR_FIREEVENT = 'fire_event'
|
||||
ATTR_DATA_TYPE = 'data_type'
|
||||
ATTR_DATA_BITS = 'data_bits'
|
||||
ATTR_DUMMY = 'dummy'
|
||||
ATTR_OFF_DELAY = 'off_delay'
|
||||
CONF_SIGNAL_REPETITIONS = 'signal_repetitions'
|
||||
CONF_DEVICES = 'devices'
|
||||
EVENT_BUTTON_PRESSED = 'button_pressed'
|
||||
@ -78,6 +83,8 @@ def _valid_device(value, device_type):
|
||||
|
||||
if device_type == 'sensor':
|
||||
config[key] = DEVICE_SCHEMA_SENSOR(device)
|
||||
elif device_type == 'binary_sensor':
|
||||
config[key] = DEVICE_SCHEMA_BINARYSENSOR(device)
|
||||
elif device_type == 'light_switch':
|
||||
config[key] = DEVICE_SCHEMA(device)
|
||||
else:
|
||||
@ -93,6 +100,11 @@ def valid_sensor(value):
|
||||
return _valid_device(value, "sensor")
|
||||
|
||||
|
||||
def valid_binary_sensor(value):
|
||||
"""Validate binary sensor configuration."""
|
||||
return _valid_device(value, "binary_sensor")
|
||||
|
||||
|
||||
def _valid_light_switch(value):
|
||||
return _valid_device(value, "light_switch")
|
||||
|
||||
@ -109,6 +121,17 @@ DEVICE_SCHEMA_SENSOR = vol.Schema({
|
||||
vol.All(cv.ensure_list, [vol.In(DATA_TYPES.keys())]),
|
||||
})
|
||||
|
||||
DEVICE_SCHEMA_BINARYSENSOR = vol.Schema({
|
||||
vol.Optional(ATTR_NAME, default=None): cv.string,
|
||||
vol.Optional(CONF_SENSOR_CLASS, default=None): cv.string,
|
||||
vol.Optional(ATTR_FIREEVENT, default=False): cv.boolean,
|
||||
vol.Optional(ATTR_OFF_DELAY, default=None):
|
||||
vol.Any(cv.time_period, cv.positive_timedelta),
|
||||
vol.Optional(ATTR_DATA_BITS, default=None): cv.positive_int,
|
||||
vol.Optional(CONF_COMMAND_ON, default=None): cv.byte,
|
||||
vol.Optional(CONF_COMMAND_OFF, default=None): cv.byte
|
||||
})
|
||||
|
||||
DEFAULT_SCHEMA = vol.Schema({
|
||||
vol.Required("platform"): DOMAIN,
|
||||
vol.Optional(CONF_DEVICES, default={}): vol.All(dict, _valid_light_switch),
|
||||
@ -192,6 +215,78 @@ def get_rfx_object(packetid):
|
||||
return obj
|
||||
|
||||
|
||||
def get_pt2262_deviceid(device_id, nb_data_bits):
|
||||
"""Extract and return the address bits from a Lighting4/PT2262 packet."""
|
||||
import binascii
|
||||
try:
|
||||
data = bytearray.fromhex(device_id)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
mask = 0xFF & ~((1 << nb_data_bits) - 1)
|
||||
|
||||
data[len(data)-1] &= mask
|
||||
|
||||
return binascii.hexlify(data)
|
||||
|
||||
|
||||
def get_pt2262_cmd(device_id, data_bits):
|
||||
"""Extract and return the data bits from a Lighting4/PT2262 packet."""
|
||||
try:
|
||||
data = bytearray.fromhex(device_id)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
mask = 0xFF & ((1 << data_bits) - 1)
|
||||
|
||||
return hex(data[-1] & mask)
|
||||
|
||||
|
||||
# pylint: disable=unused-variable
|
||||
def get_pt2262_device(device_id):
|
||||
"""Look for the device which id matches the given device_id parameter."""
|
||||
for dev_id, device in RFX_DEVICES.items():
|
||||
try:
|
||||
if (device.is_pt2262 and
|
||||
device.masked_id == get_pt2262_deviceid(
|
||||
device_id,
|
||||
device.data_bits)):
|
||||
_LOGGER.info("rfxtrx: found matching device %s for %s",
|
||||
device_id,
|
||||
get_pt2262_deviceid(device_id, device.data_bits))
|
||||
return device
|
||||
except AttributeError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
# pylint: disable=unused-variable
|
||||
def find_possible_pt2262_device(device_id):
|
||||
"""Look for the device which id matches the given device_id parameter."""
|
||||
for dev_id, device in RFX_DEVICES.items():
|
||||
if len(dev_id) == len(device_id):
|
||||
size = None
|
||||
for i in range(0, len(dev_id)):
|
||||
if dev_id[i] != device_id[i]:
|
||||
break
|
||||
size = i
|
||||
|
||||
if size is not None:
|
||||
size = len(dev_id) - size - 1
|
||||
_LOGGER.info("rfxtrx: found possible device %s for %s "
|
||||
"with the following configuration:\n"
|
||||
"data_bits=%d\n"
|
||||
"command_on=0x%s\n"
|
||||
"command_off=0x%s\n",
|
||||
device_id,
|
||||
dev_id,
|
||||
size * 4,
|
||||
dev_id[-size:], device_id[-size:])
|
||||
return device
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_devices_from_config(config, device, hass):
|
||||
"""Read rfxtrx configuration."""
|
||||
signal_repetitions = config[CONF_SIGNAL_REPETITIONS]
|
||||
@ -319,6 +414,11 @@ class RfxtrxDevice(Entity):
|
||||
"""Return is the device must fire event."""
|
||||
return self._should_fire_event
|
||||
|
||||
@property
|
||||
def is_pt2262(self):
|
||||
"""Return true if the device is PT2262-based."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_on(self):
|
||||
"""Return true if device is on."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user