mirror of
https://github.com/home-assistant/core.git
synced 2025-07-24 21:57:51 +00:00
Enable ZigBee "push" updates for digital/analog sensors. (#1976)
This commit is contained in:
parent
d73f8d5253
commit
c72ab42c19
@ -5,14 +5,18 @@ For more details about this component, please refer to the documentation at
|
|||||||
https://home-assistant.io/components/zigbee/
|
https://home-assistant.io/components/zigbee/
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
import pickle
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify, unhexlify
|
||||||
|
from base64 import b64encode, b64decode
|
||||||
|
|
||||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||||
from homeassistant.core import JobPriority
|
from homeassistant.core import JobPriority
|
||||||
from homeassistant.helpers.entity import Entity
|
from homeassistant.helpers.entity import Entity
|
||||||
|
|
||||||
DOMAIN = "zigbee"
|
DOMAIN = "zigbee"
|
||||||
REQUIREMENTS = ("xbee-helper==0.0.6",)
|
REQUIREMENTS = ("xbee-helper==0.0.7",)
|
||||||
|
|
||||||
|
EVENT_ZIGBEE_FRAME_RECEIVED = "zigbee_frame_received"
|
||||||
|
|
||||||
CONF_DEVICE = "device"
|
CONF_DEVICE = "device"
|
||||||
CONF_BAUD = "baud"
|
CONF_BAUD = "baud"
|
||||||
@ -25,9 +29,14 @@ DEFAULT_ADC_MAX_VOLTS = 1.2
|
|||||||
GPIO_DIGITAL_OUTPUT_LOW = None
|
GPIO_DIGITAL_OUTPUT_LOW = None
|
||||||
GPIO_DIGITAL_OUTPUT_HIGH = None
|
GPIO_DIGITAL_OUTPUT_HIGH = None
|
||||||
ADC_PERCENTAGE = None
|
ADC_PERCENTAGE = None
|
||||||
|
DIGITAL_PINS = None
|
||||||
|
ANALOG_PINS = None
|
||||||
|
CONVERT_ADC = None
|
||||||
ZIGBEE_EXCEPTION = None
|
ZIGBEE_EXCEPTION = None
|
||||||
ZIGBEE_TX_FAILURE = None
|
ZIGBEE_TX_FAILURE = None
|
||||||
|
|
||||||
|
ATTR_FRAME = "frame"
|
||||||
|
|
||||||
DEVICE = None
|
DEVICE = None
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -39,17 +48,24 @@ def setup(hass, config):
|
|||||||
global GPIO_DIGITAL_OUTPUT_LOW
|
global GPIO_DIGITAL_OUTPUT_LOW
|
||||||
global GPIO_DIGITAL_OUTPUT_HIGH
|
global GPIO_DIGITAL_OUTPUT_HIGH
|
||||||
global ADC_PERCENTAGE
|
global ADC_PERCENTAGE
|
||||||
|
global DIGITAL_PINS
|
||||||
|
global ANALOG_PINS
|
||||||
|
global CONVERT_ADC
|
||||||
global ZIGBEE_EXCEPTION
|
global ZIGBEE_EXCEPTION
|
||||||
global ZIGBEE_TX_FAILURE
|
global ZIGBEE_TX_FAILURE
|
||||||
|
|
||||||
import xbee_helper.const as xb_const
|
import xbee_helper.const as xb_const
|
||||||
from xbee_helper import ZigBee
|
from xbee_helper import ZigBee
|
||||||
|
from xbee_helper.device import convert_adc
|
||||||
from xbee_helper.exceptions import ZigBeeException, ZigBeeTxFailure
|
from xbee_helper.exceptions import ZigBeeException, ZigBeeTxFailure
|
||||||
from serial import Serial, SerialException
|
from serial import Serial, SerialException
|
||||||
|
|
||||||
GPIO_DIGITAL_OUTPUT_LOW = xb_const.GPIO_DIGITAL_OUTPUT_LOW
|
GPIO_DIGITAL_OUTPUT_LOW = xb_const.GPIO_DIGITAL_OUTPUT_LOW
|
||||||
GPIO_DIGITAL_OUTPUT_HIGH = xb_const.GPIO_DIGITAL_OUTPUT_HIGH
|
GPIO_DIGITAL_OUTPUT_HIGH = xb_const.GPIO_DIGITAL_OUTPUT_HIGH
|
||||||
ADC_PERCENTAGE = xb_const.ADC_PERCENTAGE
|
ADC_PERCENTAGE = xb_const.ADC_PERCENTAGE
|
||||||
|
DIGITAL_PINS = xb_const.DIGITAL_PINS
|
||||||
|
ANALOG_PINS = xb_const.ANALOG_PINS
|
||||||
|
CONVERT_ADC = convert_adc
|
||||||
ZIGBEE_EXCEPTION = ZigBeeException
|
ZIGBEE_EXCEPTION = ZigBeeException
|
||||||
ZIGBEE_TX_FAILURE = ZigBeeTxFailure
|
ZIGBEE_TX_FAILURE = ZigBeeTxFailure
|
||||||
|
|
||||||
@ -62,6 +78,19 @@ def setup(hass, config):
|
|||||||
return False
|
return False
|
||||||
DEVICE = ZigBee(ser)
|
DEVICE = ZigBee(ser)
|
||||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, close_serial_port)
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, close_serial_port)
|
||||||
|
|
||||||
|
def _frame_received(frame):
|
||||||
|
"""Called when a ZigBee frame is received.
|
||||||
|
|
||||||
|
Pickles the frame, then encodes it into base64 since it contains
|
||||||
|
non JSON serializable binary.
|
||||||
|
"""
|
||||||
|
hass.bus.fire(
|
||||||
|
EVENT_ZIGBEE_FRAME_RECEIVED,
|
||||||
|
{ATTR_FRAME: b64encode(pickle.dumps(frame)).decode("ascii")})
|
||||||
|
|
||||||
|
DEVICE.add_frame_rx_handler(_frame_received)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@ -70,6 +99,25 @@ def close_serial_port(*args):
|
|||||||
DEVICE.zb.serial.close()
|
DEVICE.zb.serial.close()
|
||||||
|
|
||||||
|
|
||||||
|
def frame_is_relevant(entity, frame):
|
||||||
|
"""Test whether the frame is relevant to the entity."""
|
||||||
|
if frame.get("source_addr_long") != entity.config.address:
|
||||||
|
return False
|
||||||
|
if "samples" not in frame:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def subscribe(hass, callback):
|
||||||
|
"""Subscribe to incoming ZigBee frames."""
|
||||||
|
def zigbee_frame_subscriber(event):
|
||||||
|
"""Decode and unpickle the frame from the event bus, and call back."""
|
||||||
|
frame = pickle.loads(b64decode(event.data[ATTR_FRAME]))
|
||||||
|
callback(frame)
|
||||||
|
|
||||||
|
hass.bus.listen(EVENT_ZIGBEE_FRAME_RECEIVED, zigbee_frame_subscriber)
|
||||||
|
|
||||||
|
|
||||||
class ZigBeeConfig(object):
|
class ZigBeeConfig(object):
|
||||||
"""Handle the fetching of configuration from the config file."""
|
"""Handle the fetching of configuration from the config file."""
|
||||||
|
|
||||||
@ -110,14 +158,65 @@ class ZigBeePinConfig(ZigBeeConfig):
|
|||||||
return self._config["pin"]
|
return self._config["pin"]
|
||||||
|
|
||||||
|
|
||||||
class ZigBeeDigitalPinConfig(ZigBeePinConfig):
|
class ZigBeeDigitalInConfig(ZigBeePinConfig):
|
||||||
"""Handle the fetching of configuration from the config file."""
|
"""A subclass of ZigBeePinConfig."""
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
"""Initialize the configuration."""
|
"""Initialise the ZigBee Digital input config."""
|
||||||
super(ZigBeeDigitalPinConfig, self).__init__(config)
|
super(ZigBeeDigitalInConfig, self).__init__(config)
|
||||||
self._bool2state, self._state2bool = self.boolean_maps
|
self._bool2state, self._state2bool = self.boolean_maps
|
||||||
|
|
||||||
|
@property
|
||||||
|
def boolean_maps(self):
|
||||||
|
"""Create mapping dictionaries for potential inversion of booleans.
|
||||||
|
|
||||||
|
Create dicts to map the pin state (true/false) to potentially inverted
|
||||||
|
values depending on the on_state config value which should be set to
|
||||||
|
"low" or "high".
|
||||||
|
"""
|
||||||
|
if self._config.get("on_state", "").lower() == "low":
|
||||||
|
bool2state = {
|
||||||
|
True: False,
|
||||||
|
False: True
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
bool2state = {
|
||||||
|
True: True,
|
||||||
|
False: False
|
||||||
|
}
|
||||||
|
state2bool = {v: k for k, v in bool2state.items()}
|
||||||
|
return bool2state, state2bool
|
||||||
|
|
||||||
|
@property
|
||||||
|
def bool2state(self):
|
||||||
|
"""A dictionary mapping the internal value to the ZigBee value.
|
||||||
|
|
||||||
|
For the translation of on/off as being pin high or low.
|
||||||
|
"""
|
||||||
|
return self._bool2state
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state2bool(self):
|
||||||
|
"""A dictionary mapping the ZigBee value to the internal value.
|
||||||
|
|
||||||
|
For the translation of pin high/low as being on or off.
|
||||||
|
"""
|
||||||
|
return self._state2bool
|
||||||
|
|
||||||
|
|
||||||
|
class ZigBeeDigitalOutConfig(ZigBeePinConfig):
|
||||||
|
"""A subclass of ZigBeePinConfig.
|
||||||
|
|
||||||
|
Set _should_poll to default as False instead of True. The value will
|
||||||
|
still be overridden by the presence of a 'poll' config entry.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, config):
|
||||||
|
"""Initialize the ZigBee Digital out."""
|
||||||
|
super(ZigBeeDigitalOutConfig, self).__init__(config)
|
||||||
|
self._bool2state, self._state2bool = self.boolean_maps
|
||||||
|
self._should_poll = config.get("poll", False)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def boolean_maps(self):
|
def boolean_maps(self):
|
||||||
"""Create dicts to map booleans to pin high/low and vice versa.
|
"""Create dicts to map booleans to pin high/low and vice versa.
|
||||||
@ -154,22 +253,6 @@ class ZigBeeDigitalPinConfig(ZigBeePinConfig):
|
|||||||
"""
|
"""
|
||||||
return self._state2bool
|
return self._state2bool
|
||||||
|
|
||||||
# Create an alias so that ZigBeeDigitalOutConfig has a logical opposite.
|
|
||||||
ZigBeeDigitalInConfig = ZigBeeDigitalPinConfig
|
|
||||||
|
|
||||||
|
|
||||||
class ZigBeeDigitalOutConfig(ZigBeeDigitalPinConfig):
|
|
||||||
"""A subclass of ZigBeeDigitalPinConfig.
|
|
||||||
|
|
||||||
Set _should_poll to default as False instead of True. The value will
|
|
||||||
still be overridden by the presence of a 'poll' config entry.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, config):
|
|
||||||
"""Initialize the ZigBee Digital out."""
|
|
||||||
super(ZigBeeDigitalOutConfig, self).__init__(config)
|
|
||||||
self._should_poll = config.get("poll", False)
|
|
||||||
|
|
||||||
|
|
||||||
class ZigBeeAnalogInConfig(ZigBeePinConfig):
|
class ZigBeeAnalogInConfig(ZigBeePinConfig):
|
||||||
"""Representation of a ZigBee GPIO pin set to analog in."""
|
"""Representation of a ZigBee GPIO pin set to analog in."""
|
||||||
@ -187,6 +270,25 @@ class ZigBeeDigitalIn(Entity):
|
|||||||
"""Initialize the device."""
|
"""Initialize the device."""
|
||||||
self._config = config
|
self._config = config
|
||||||
self._state = False
|
self._state = False
|
||||||
|
|
||||||
|
def handle_frame(frame):
|
||||||
|
"""Handle an incoming frame.
|
||||||
|
|
||||||
|
Handle an incoming frame and update our status if it contains
|
||||||
|
information relating to this device.
|
||||||
|
"""
|
||||||
|
if not frame_is_relevant(self, frame):
|
||||||
|
return
|
||||||
|
sample = frame["samples"].pop()
|
||||||
|
pin_name = DIGITAL_PINS[self._config.pin]
|
||||||
|
if pin_name not in sample:
|
||||||
|
# Doesn't contain information about our pin
|
||||||
|
return
|
||||||
|
self._state = self._config.state2bool[sample[pin_name]]
|
||||||
|
self.update_ha_state()
|
||||||
|
|
||||||
|
subscribe(hass, handle_frame)
|
||||||
|
|
||||||
# Get initial state
|
# Get initial state
|
||||||
hass.pool.add_job(
|
hass.pool.add_job(
|
||||||
JobPriority.EVENT_STATE, (self.update_ha_state, True))
|
JobPriority.EVENT_STATE, (self.update_ha_state, True))
|
||||||
@ -196,6 +298,11 @@ class ZigBeeDigitalIn(Entity):
|
|||||||
"""Return the name of the input."""
|
"""Return the name of the input."""
|
||||||
return self._config.name
|
return self._config.name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def config(self):
|
||||||
|
"""The entity's configuration."""
|
||||||
|
return self._config
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def should_poll(self):
|
def should_poll(self):
|
||||||
"""Return the state of the polling, if needed."""
|
"""Return the state of the polling, if needed."""
|
||||||
@ -207,11 +314,9 @@ class ZigBeeDigitalIn(Entity):
|
|||||||
return self._state
|
return self._state
|
||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
"""Ask the ZigBee device what its output is set to."""
|
"""Ask the ZigBee device what state its input pin is in."""
|
||||||
try:
|
try:
|
||||||
pin_state = DEVICE.get_gpio_pin(
|
sample = DEVICE.get_sample(self._config.address)
|
||||||
self._config.pin,
|
|
||||||
self._config.address)
|
|
||||||
except ZIGBEE_TX_FAILURE:
|
except ZIGBEE_TX_FAILURE:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"Transmission failure when attempting to get sample from "
|
"Transmission failure when attempting to get sample from "
|
||||||
@ -221,7 +326,14 @@ class ZigBeeDigitalIn(Entity):
|
|||||||
_LOGGER.exception(
|
_LOGGER.exception(
|
||||||
"Unable to get sample from ZigBee device: %s", exc)
|
"Unable to get sample from ZigBee device: %s", exc)
|
||||||
return
|
return
|
||||||
self._state = self._config.state2bool[pin_state]
|
pin_name = DIGITAL_PINS[self._config.pin]
|
||||||
|
if pin_name not in sample:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Pin %s (%s) was not in the sample provided by ZigBee device "
|
||||||
|
"%s.",
|
||||||
|
self._config.pin, pin_name, hexlify(self._config.address))
|
||||||
|
return
|
||||||
|
self._state = self._config.state2bool[sample[pin_name]]
|
||||||
|
|
||||||
|
|
||||||
class ZigBeeDigitalOut(ZigBeeDigitalIn):
|
class ZigBeeDigitalOut(ZigBeeDigitalIn):
|
||||||
@ -255,6 +367,24 @@ class ZigBeeDigitalOut(ZigBeeDigitalIn):
|
|||||||
"""Set the digital output to its 'off' state."""
|
"""Set the digital output to its 'off' state."""
|
||||||
self._set_state(False)
|
self._set_state(False)
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
"""Ask the ZigBee device what its output is set to."""
|
||||||
|
try:
|
||||||
|
pin_state = DEVICE.get_gpio_pin(
|
||||||
|
self._config.pin,
|
||||||
|
self._config.address)
|
||||||
|
except ZIGBEE_TX_FAILURE:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Transmission failure when attempting to get output pin status"
|
||||||
|
" from ZigBee device at address: %s",
|
||||||
|
hexlify(self._config.address))
|
||||||
|
return
|
||||||
|
except ZIGBEE_EXCEPTION as exc:
|
||||||
|
_LOGGER.exception(
|
||||||
|
"Unable to get output pin status from ZigBee device: %s", exc)
|
||||||
|
return
|
||||||
|
self._state = self._config.state2bool[pin_state]
|
||||||
|
|
||||||
|
|
||||||
class ZigBeeAnalogIn(Entity):
|
class ZigBeeAnalogIn(Entity):
|
||||||
"""Representation of a GPIO pin configured as an analog input."""
|
"""Representation of a GPIO pin configured as an analog input."""
|
||||||
@ -263,6 +393,29 @@ class ZigBeeAnalogIn(Entity):
|
|||||||
"""Initialize the ZigBee analog in device."""
|
"""Initialize the ZigBee analog in device."""
|
||||||
self._config = config
|
self._config = config
|
||||||
self._value = None
|
self._value = None
|
||||||
|
|
||||||
|
def handle_frame(frame):
|
||||||
|
"""Handle an incoming frame.
|
||||||
|
|
||||||
|
Handle an incoming frame and update our status if it contains
|
||||||
|
information relating to this device.
|
||||||
|
"""
|
||||||
|
if not frame_is_relevant(self, frame):
|
||||||
|
return
|
||||||
|
sample = frame["samples"].pop()
|
||||||
|
pin_name = ANALOG_PINS[self._config.pin]
|
||||||
|
if pin_name not in sample:
|
||||||
|
# Doesn't contain information about our pin
|
||||||
|
return
|
||||||
|
self._value = CONVERT_ADC(
|
||||||
|
sample[pin_name],
|
||||||
|
ADC_PERCENTAGE,
|
||||||
|
self._config.max_voltage
|
||||||
|
)
|
||||||
|
self.update_ha_state()
|
||||||
|
|
||||||
|
subscribe(hass, handle_frame)
|
||||||
|
|
||||||
# Get initial state
|
# Get initial state
|
||||||
hass.pool.add_job(
|
hass.pool.add_job(
|
||||||
JobPriority.EVENT_STATE, (self.update_ha_state, True))
|
JobPriority.EVENT_STATE, (self.update_ha_state, True))
|
||||||
@ -272,6 +425,11 @@ class ZigBeeAnalogIn(Entity):
|
|||||||
"""The name of the input."""
|
"""The name of the input."""
|
||||||
return self._config.name
|
return self._config.name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def config(self):
|
||||||
|
"""The entity's configuration."""
|
||||||
|
return self._config
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def should_poll(self):
|
def should_poll(self):
|
||||||
"""The state of the polling, if needed."""
|
"""The state of the polling, if needed."""
|
||||||
|
@ -339,7 +339,7 @@ vsure==0.8.1
|
|||||||
wakeonlan==0.2.2
|
wakeonlan==0.2.2
|
||||||
|
|
||||||
# homeassistant.components.zigbee
|
# homeassistant.components.zigbee
|
||||||
xbee-helper==0.0.6
|
xbee-helper==0.0.7
|
||||||
|
|
||||||
# homeassistant.components.sensor.yr
|
# homeassistant.components.sensor.yr
|
||||||
xmltodict
|
xmltodict
|
||||||
|
Loading…
x
Reference in New Issue
Block a user