Rflink 433Mhz gateway platform and components (#4547)

* Initial sketches of rflink component.

* Add requirement.

* Properly load configuration.

* Bump rflink for graceful parse errors and protocol callback.

* Cleanup, documentation and linting.

* More documentation, first sensor implementation (temp & hum).

* Add brightness/dim support for newkaku protocol.

* Use separate class for dimmables.

* Make sure non-dimmable newkaku devices are turned on.

* Move some code around, add switches. Support loading from config.

* Fix bug in ignoring devices.

* Fix initial state assumption.

* Improve reliability on invalid conditions.

* Allow configuration of group for new devices.

* Sensor icons.

* Fix parsing negative numbers.

* Correct icon.

* Allow sending commands serial.

* Pluralize.

* Allow adding sensors from config.

* Fix ignoring devices and bugs in previous commit.

* Share know devices so devices from configuration don't get added as lights.

* Lookup unit from value_key.

* Remove debug.

* Start implementing event protocol in place of packet protocol.

- Added first test suite for sensors.
- This currently breaks light and switch.

* Refactor switch component to fit new rflink changes. Add test suite.

* Fix style.

* Refactor and test lights. Bring coverage to 100%.

* Use non-broken and production tested rflink module.

* Update requirements.

* Bump for logging.

* Improve readability.

* Do not use global variable but keep known device state in intended place.

* Improve docs.

* Make icon support generic.

* Disable overriding icons in config, as it belongs in customization. Only keep custom icon for entities that are able to detect a icon based on the thing they represent (sensors in this case).

* Implement configuration schema, overall refactor of magic values.

* Fix bug in config/test wait_for_ack.

* Small refactors.

* Move command logic into separate class.

* Convert command sending logic to class based pattern instead of using the event bus.

* Start not using bus for rflink event propagation to platforms.

* Do not use event bus for all entity types.

* Fire an event on the bus for every switch incoming rflink command.

* Resolve lint errors, remove some old code.

* Known devices no longer need to be registered separately.

* Log bus events.

* Event callback is a..... callback.

* Use full entity id for events.

* Move event sending to entity.

* Log incoming events.

* Make firing events optional inline with rfxtrx.

* Add foundation for signal repetition.

* Add signal repetition config and tests.

* Make plain switchable type explicitly configurable.

* Enable default entity settings for automatically added entities as well.

* Prevent default configuration leaking accross entities.

* Make sure device defaults don't get overwritten by defaults further down.

* Don't let fast state switching and repetitions turn your house into a disco.

* Make repetitions more responsive.

* Disable on/off fallback on dimmables as it currently doesn't play nice with repetitions.

* Use rflink that allows send_command_ack to be safely cancelled.

* Reduce duplication and make repeat work for non-ack.

* Implement reconnection logic.

* Improve reconnection logic.

* Also cancel repetitions when entity state is changed due to external command.

* Update requirements.

* Fix linting.

* Fix spelling.

* Don't lie.

* Fix lint.

* Support for automatically creating protocol translation (fixes spaces in device names).

* Returned support for dimmable and on/off entity.

* Duplicate code to fix linting issues with inheritance.

* Allow overriding unit of measurement from config.
This commit is contained in:
Johan Bloemberg 2017-01-31 17:11:52 +01:00 committed by Paulus Schoutsen
parent 9925b2a8e0
commit bbda2a72f4
9 changed files with 1598 additions and 0 deletions

View File

@ -0,0 +1,239 @@
"""Support for Rflink lights.
For more details about this platform, please refer to the documentation
at https://home-assistant.io/components/light.rflink/
"""
import asyncio
import logging
from homeassistant.components import group
from homeassistant.components.light import (
ATTR_BRIGHTNESS, SUPPORT_BRIGHTNESS, Light)
from homeassistant.components.rflink import (
CONF_ALIASSES, CONF_DEVICE_DEFAULTS, CONF_DEVICES, CONF_FIRE_EVENT,
CONF_IGNORE_DEVICES, CONF_NEW_DEVICES_GROUP, CONF_SIGNAL_REPETITIONS,
DATA_DEVICE_REGISTER, DATA_ENTITY_LOOKUP, DEFAULT_SIGNAL_REPETITIONS,
DOMAIN, EVENT_KEY_COMMAND, EVENT_KEY_ID, SwitchableRflinkDevice, cv, vol)
from homeassistant.const import CONF_NAME, CONF_PLATFORM, CONF_TYPE
DEPENDENCIES = ['rflink']
_LOGGER = logging.getLogger(__name__)
TYPE_DIMMABLE = 'dimmable'
TYPE_SWITCHABLE = 'switchable'
TYPE_HYBRID = 'hybrid'
DEVICE_DEFAULTS_SCHEMA = vol.Schema({
vol.Optional(CONF_FIRE_EVENT, default=False): cv.boolean,
vol.Optional(CONF_SIGNAL_REPETITIONS,
default=DEFAULT_SIGNAL_REPETITIONS): vol.Coerce(int),
})
PLATFORM_SCHEMA = vol.Schema({
vol.Required(CONF_PLATFORM): DOMAIN,
vol.Optional(CONF_NEW_DEVICES_GROUP, default=None): cv.string,
vol.Optional(CONF_IGNORE_DEVICES): vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_DEVICE_DEFAULTS, default=DEVICE_DEFAULTS_SCHEMA({})):
DEVICE_DEFAULTS_SCHEMA,
vol.Optional(CONF_DEVICES, default={}): vol.Schema({
cv.string: {
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_TYPE):
vol.Any(TYPE_DIMMABLE, TYPE_SWITCHABLE, TYPE_HYBRID),
vol.Optional(CONF_ALIASSES, default=[]):
vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_FIRE_EVENT, default=False): cv.boolean,
vol.Optional(CONF_SIGNAL_REPETITIONS): vol.Coerce(int),
},
}),
})
def entity_type_for_device_id(device_id):
"""Return entity class for procotol of a given device_id.
Async friendly.
"""
entity_type_mapping = {
# KlikAanKlikUit support both dimmers and on/off switches on the same
# protocol
'newkaku': TYPE_HYBRID,
}
protocol = device_id.split('_')[0]
return entity_type_mapping.get(protocol, None)
def entity_class_for_type(entity_type):
"""Translate entity type to entity class.
Async friendly.
"""
entity_device_mapping = {
# sends only 'dim' commands not compatible with on/off switches
TYPE_DIMMABLE: DimmableRflinkLight,
# sends only 'on/off' commands not advices with dimmers and signal
# repetition
TYPE_SWITCHABLE: RflinkLight,
# sends 'dim' and 'on' command to support both dimmers and on/off
# switches. Not compatible with signal repetition.
TYPE_HYBRID: HybridRflinkLight,
}
return entity_device_mapping.get(entity_type, RflinkLight)
def devices_from_config(domain_config, hass=None):
"""Parse config and add rflink switch devices."""
devices = []
for device_id, config in domain_config[CONF_DEVICES].items():
# determine which kind of entity to create
if CONF_TYPE in config:
# remove type from config to not pass it as and argument to entity
# instantiation
entity_type = config.pop(CONF_TYPE)
else:
entity_type = entity_type_for_device_id(device_id)
entity_class = entity_class_for_type(entity_type)
device_config = dict(domain_config[CONF_DEVICE_DEFAULTS], **config)
is_hybrid = entity_class is HybridRflinkLight
# make user aware this can cause problems
repetitions_enabled = device_config[CONF_SIGNAL_REPETITIONS] != 1
if is_hybrid and repetitions_enabled:
_LOGGER.warning(
"Hybrid type for %s not compatible with signal "
"repetitions. Please set 'dimmable' or 'switchable' "
"type explicity in configuration.",
device_id)
device = entity_class(device_id, hass, **device_config)
devices.append(device)
# register entity (and aliasses) to listen to incoming rflink events
for _id in [device_id] + config[CONF_ALIASSES]:
hass.data[DATA_ENTITY_LOOKUP][
EVENT_KEY_COMMAND][_id].append(device)
return devices
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the Rflink platform."""
# add devices from config
yield from async_add_devices(devices_from_config(config, hass))
# add new (unconfigured) devices to user desired group
if config[CONF_NEW_DEVICES_GROUP]:
new_devices_group = yield from group.Group.async_create_group(
hass, config[CONF_NEW_DEVICES_GROUP], [], True)
else:
new_devices_group = None
@asyncio.coroutine
def add_new_device(event):
"""Check if device is known, otherwise add to list of known devices."""
device_id = event[EVENT_KEY_ID]
entity_type = entity_type_for_device_id(event[EVENT_KEY_ID])
entity_class = entity_class_for_type(entity_type)
device_config = config[CONF_DEVICE_DEFAULTS]
device = entity_class(device_id, hass, **device_config)
yield from async_add_devices([device])
# register entity to listen to incoming rflink events
hass.data[DATA_ENTITY_LOOKUP][
EVENT_KEY_COMMAND][device_id].append(device)
# make sure the event is processed by the new entity
device.handle_event(event)
# maybe add to new devices group
if new_devices_group:
yield from new_devices_group.async_update_tracked_entity_ids(
list(new_devices_group.tracking) + [device.entity_id])
hass.data[DATA_DEVICE_REGISTER][EVENT_KEY_COMMAND] = add_new_device
class RflinkLight(SwitchableRflinkDevice, Light):
"""Representation of a Rflink light."""
pass
class DimmableRflinkLight(SwitchableRflinkDevice, Light):
"""Rflink light device that support dimming."""
_brightness = 255
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the device on."""
if ATTR_BRIGHTNESS in kwargs:
# rflink only support 16 brightness levels
self._brightness = int(kwargs[ATTR_BRIGHTNESS] / 17) * 17
# turn on light at the requested dim level
yield from self._async_handle_command('dim', self._brightness)
@property
def brightness(self):
"""Return the brightness of this light between 0..255."""
return self._brightness
@property
def supported_features(self):
"""Flag supported features."""
return SUPPORT_BRIGHTNESS
class HybridRflinkLight(SwitchableRflinkDevice, Light):
"""Rflink light device that sends out both dim and on/off commands.
Used for protocols which support lights that are not exclusively on/off
style. For example KlikAanKlikUit supports both on/off and dimmable light
switches using the same protocol. This type allows unconfigured
KlikAanKlikUit devices to support dimming without breaking support for
on/off switches.
This type is not compatible with signal repetitions as the 'dim' and 'on'
command are send sequential and multiple 'on' commands to a dimmable
device can cause the dimmer to switch into a pulsating brightness mode.
Which results in a nice house disco :)
"""
_brightness = 255
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the device on and set dim level."""
if ATTR_BRIGHTNESS in kwargs:
# rflink only support 16 brightness levels
self._brightness = int(kwargs[ATTR_BRIGHTNESS] / 17) * 17
# if receiver supports dimming this will turn on the light
# at the requested dim level
yield from self._async_handle_command('dim', self._brightness)
# if the receiving device does not support dimlevel this
# will ensure it is turned on when full brightness is set
if self._brightness == 255:
yield from self._async_handle_command("turn_on")
@property
def brightness(self):
"""Return the brightness of this light between 0..255."""
return self._brightness
@property
def supported_features(self):
"""Flag supported features."""
return SUPPORT_BRIGHTNESS

View File

@ -0,0 +1,396 @@
"""Support for Rflink components.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/rflink/
Technical overview:
The Rflink gateway is a USB serial device (Arduino with Rflink firwmare)
connected to a 433Mhz transceiver module.
The the `rflink` Python module a asyncio transport/protocol is setup that
fires an callback for every (valid/supported) packet received by the Rflink
gateway.
This component uses this callback to distribute 'rflink packet events' over
the HASS bus which can be subscribed to by entities/platform implementations.
The platform implementions take care of creating new devices (if enabled) for
unsees incoming packet id's.
Device Entities take care of matching to the packet id, interpreting and
performing actions based on the packet contents. Common entitiy logic is
maintained in this file.
"""
import asyncio
from collections import defaultdict
import functools as ft
import logging
from homeassistant.const import (
ATTR_ENTITY_ID, CONF_HOST, CONF_PORT, EVENT_HOMEASSISTANT_STOP,
STATE_UNKNOWN)
from homeassistant.core import CoreState, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
import voluptuous as vol
REQUIREMENTS = ['rflink==0.0.24']
DOMAIN = 'rflink'
CONF_ALIASSES = 'aliasses'
CONF_DEVICES = 'devices'
CONF_DEVICE_DEFAULTS = 'device_defaults'
CONF_FIRE_EVENT = 'fire_event'
CONF_IGNORE_DEVICES = 'ignore_devices'
CONF_NEW_DEVICES_GROUP = 'new_devices_group'
CONF_RECONNECT_INTERVAL = 'reconnect_interval'
CONF_SIGNAL_REPETITIONS = 'signal_repetitions'
CONF_WAIT_FOR_ACK = 'wait_for_ack'
DEFAULT_SIGNAL_REPETITIONS = 1
DEFAULT_RECONNECT_INTERVAL = 10
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Required(CONF_PORT): vol.Any(cv.port, cv.string),
vol.Optional(CONF_HOST, default=None): cv.string,
vol.Optional(CONF_WAIT_FOR_ACK, default=True): cv.boolean,
vol.Optional(CONF_RECONNECT_INTERVAL,
default=DEFAULT_RECONNECT_INTERVAL): int,
vol.Optional(CONF_IGNORE_DEVICES, default=[]):
vol.All(cv.ensure_list, [cv.string]),
}),
}, extra=vol.ALLOW_EXTRA)
ATTR_EVENT = 'event'
ATTR_STATE = 'state'
DATA_DEVICE_REGISTER = 'rflink_device_register'
DATA_ENTITY_LOOKUP = 'rflink_entity_lookup'
EVENT_BUTTON_PRESSED = 'button_pressed'
EVENT_KEY_COMMAND = 'command'
EVENT_KEY_ID = 'id'
EVENT_KEY_SENSOR = 'sensor'
EVENT_KEY_UNIT = 'unit'
_LOGGER = logging.getLogger(__name__)
def identify_event_type(event):
"""Look at event to determine type of device.
Async friendly.
"""
if EVENT_KEY_COMMAND in event:
return EVENT_KEY_COMMAND
elif EVENT_KEY_SENSOR in event:
return EVENT_KEY_SENSOR
else:
return 'unknown'
@asyncio.coroutine
def async_setup(hass, config):
"""Setup the Rflink component."""
from rflink.protocol import create_rflink_connection
import serial
# allow entities to register themselves by device_id to be looked up when
# new rflink events arrive to be handled
hass.data[DATA_ENTITY_LOOKUP] = {
EVENT_KEY_COMMAND: defaultdict(list),
EVENT_KEY_SENSOR: defaultdict(list),
}
# allow platform to specify function to register new unknown devices
hass.data[DATA_DEVICE_REGISTER] = {}
@callback
def event_callback(event):
"""Handle incoming rflink events.
Rflink events arrive as dictionaries of varying content
depending on their type. Identify the events and distribute
accordingly.
"""
event_type = identify_event_type(event)
_LOGGER.debug('event of type %s: %s', event_type, event)
# don't propagate non entity events (eg: version string, ack response)
if event_type not in hass.data[DATA_ENTITY_LOOKUP]:
_LOGGER.debug('unhandled event of type: %s', event_type)
return
# lookup entities who registered this device id as device id or alias
event_id = event.get('id', None)
entities = hass.data[DATA_ENTITY_LOOKUP][event_type][event_id]
if entities:
# propagate event to every entity matching the device id
for entity in entities:
_LOGGER.debug('passing event to %s', entities)
entity.handle_event(event)
else:
_LOGGER.debug('device_id not known, adding new device')
# if device is not yet known, register with platform (if loaded)
if event_type in hass.data[DATA_DEVICE_REGISTER]:
hass.async_run_job(
hass.data[DATA_DEVICE_REGISTER][event_type], event)
# when connecting to tcp host instead of serial port (optional)
host = config[DOMAIN][CONF_HOST]
# tcp port when host configured, otherwise serial port
port = config[DOMAIN][CONF_PORT]
@callback
def reconnect(exc=None):
"""Schedule reconnect after connection has been unexpectedly lost."""
# reset protocol binding before starting reconnect
RflinkCommand.set_rflink_protocol(None)
# if HA is not stopping, initiate new connection
if hass.state != CoreState.stopping:
_LOGGER.warning('disconnected from Rflink, reconnecting')
hass.async_add_job(connect)
@asyncio.coroutine
def connect():
"""Setup connection and hook it into HA for reconnect/shutdown."""
_LOGGER.info('initiating Rflink connection')
# rflink create_rflink_connection decides based on the value of host
# (string or None) if serial or tcp mode should be used
# initiate serial/tcp connection to Rflink gateway
connection = create_rflink_connection(
port=port,
host=host,
event_callback=event_callback,
disconnect_callback=reconnect,
loop=hass.loop,
ignore=config[DOMAIN][CONF_IGNORE_DEVICES]
)
try:
transport, protocol = yield from connection
except (serial.serialutil.SerialException, ConnectionRefusedError,
TimeoutError) as exc:
reconnect_interval = config[DOMAIN][CONF_RECONNECT_INTERVAL]
_LOGGER.exception(
'error connecting to Rflink, reconnecting in %s',
reconnect_interval)
hass.loop.call_later(reconnect_interval, reconnect, exc)
return
# bind protocol to command class to allow entities to send commands
RflinkCommand.set_rflink_protocol(
protocol, config[DOMAIN][CONF_WAIT_FOR_ACK])
# handle shutdown of rflink asyncio transport
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP,
lambda x: transport.close())
_LOGGER.info('connected to Rflink')
# make initial connection
yield from connect()
# whoo
return True
class RflinkDevice(Entity):
"""Represents a Rflink device.
Contains the common logic for Rflink entities.
"""
# should be set by component implementation
platform = None
# default state
_state = STATE_UNKNOWN
def __init__(self, device_id, hass, name=None,
aliasses=None, fire_event=False,
signal_repetitions=DEFAULT_SIGNAL_REPETITIONS):
"""Initialize the device."""
self.hass = hass
# rflink specific attributes for every component type
self._device_id = device_id
if name:
self._name = name
else:
self._name = device_id
# generate list of device_ids to match against
if aliasses:
self._aliasses = aliasses
else:
self._aliasses = []
self._should_fire_event = fire_event
self._signal_repetitions = signal_repetitions
def handle_event(self, event):
"""Handle incoming event for device type."""
# call platform specific event handler
self._handle_event(event)
# propagate changes through ha
self.hass.async_add_job(self.async_update_ha_state())
# put command onto bus for user to subscribe to
if self._should_fire_event and identify_event_type(
event) == EVENT_KEY_COMMAND:
self.hass.bus.fire(EVENT_BUTTON_PRESSED, {
ATTR_ENTITY_ID: self.entity_id,
ATTR_STATE: event[EVENT_KEY_COMMAND],
})
_LOGGER.debug(
'fired bus event for %s: %s',
self.entity_id,
event[EVENT_KEY_COMMAND])
def _handle_event(self, event):
"""Platform specific event handler."""
raise NotImplementedError()
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def name(self):
"""Return a name for the device."""
return self._name
@property
def is_on(self):
"""Return true if device is on."""
if self.assumed_state:
return False
return self._state
@property
def assumed_state(self):
"""Assume device state until first device event sets state."""
return self._state is STATE_UNKNOWN
class RflinkCommand(RflinkDevice):
"""Singleton class to make Rflink command interface available to entities.
This class is to be inherited by every Entity class that is actionable
(switches/lights). It exposes the Rflink command interface for these
entities.
The Rflink interface is managed as a class level and set during setup (and
reset on reconnect).
"""
# keep repetition tasks to cancel if state is changed before repetitions
# are sent
_repetition_task = None
@classmethod
def set_rflink_protocol(cls, protocol, wait_ack=None):
"""Set the Rflink asyncio protocol as a class variable."""
cls._protocol = protocol
if wait_ack is not None:
cls._wait_ack = wait_ack
@asyncio.coroutine
def _async_handle_command(self, command, *args):
"""Do bookkeeping for command, send it to rflink and update state."""
self.cancel_queued_send_commands()
if command == "turn_on":
cmd = 'on'
self._state = True
elif command == 'turn_off':
cmd = 'off'
self._state = False
elif command == 'dim':
# convert brightness to rflink dim level
cmd = str(int(args[0] / 17))
self._state = True
# send initial command and queue repetitions
# this allows the entity state to be updated quickly and not having to
# wait for all repetitions to be sent
yield from self._async_send_command(cmd, self._signal_repetitions)
# Update state of entity
yield from self.async_update_ha_state()
def cancel_queued_send_commands(self):
"""Cancel queued signal repetition commands.
For example when user changed state while repetitions are still
queued for broadcast. Or when a incoming Rflink command (remote
switch) changes the state.
"""
# cancel any outstanding tasks from the previous state change
if self._repetition_task:
self._repetition_task.cancel()
@asyncio.coroutine
def _async_send_command(self, cmd, repetitions):
"""Send a command for device to Rflink gateway."""
_LOGGER.debug('sending command: %s to rflink device: %s',
cmd, self._device_id)
if self._wait_ack:
# Puts command on outgoing buffer then waits for Rflink to confirm
# the command has been send out in the ether.
yield from self._protocol.send_command_ack(self._device_id, cmd)
else:
# Puts command on outgoing buffer and returns straight away.
# Rflink protocol/transport handles asynchronous writing of buffer
# to serial/tcp device. Does not wait for command send
# confirmation.
self.hass.loop.run_in_executor(None, ft.partial(
self._protocol.send_command, self._device_id, cmd))
if repetitions > 1:
self._repetition_task = self.hass.loop.create_task(
self._async_send_command(cmd, repetitions - 1))
class SwitchableRflinkDevice(RflinkCommand):
"""Rflink entity which can switch on/off (eg: light, switch)."""
def _handle_event(self, event):
"""Adjust state if Rflink picks up a remote command for this device."""
self.cancel_queued_send_commands()
command = event['command']
if command == 'on':
self._state = True
elif command == 'off':
self._state = False
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Turn the device on."""
yield from self._async_handle_command("turn_on")
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Turn the device off."""
yield from self._async_handle_command("turn_off")

View File

@ -0,0 +1,141 @@
"""Support for Rflink sensors.
For more details about this platform, please refer to the documentation
at https://home-assistant.io/components/light.rflink/
"""
import asyncio
from functools import partial
import logging
from homeassistant.components import group
from homeassistant.components.rflink import (
CONF_ALIASSES, CONF_DEVICES, CONF_NEW_DEVICES_GROUP, DATA_DEVICE_REGISTER,
DATA_ENTITY_LOOKUP, DOMAIN, EVENT_KEY_ID, EVENT_KEY_SENSOR, EVENT_KEY_UNIT,
RflinkDevice, cv, vol)
from homeassistant.const import (
ATTR_UNIT_OF_MEASUREMENT, CONF_NAME, CONF_PLATFORM,
CONF_UNIT_OF_MEASUREMENT)
DEPENDENCIES = ['rflink']
_LOGGER = logging.getLogger(__name__)
SENSOR_ICONS = {
'humidity': 'mdi:water-percent',
'battery': 'mdi:battery',
'temperature': 'mdi:thermometer',
}
CONF_SENSOR_TYPE = 'sensor_type'
PLATFORM_SCHEMA = vol.Schema({
vol.Required(CONF_PLATFORM): DOMAIN,
vol.Optional(CONF_NEW_DEVICES_GROUP, default=None): cv.string,
vol.Optional(CONF_DEVICES, default={}): vol.Schema({
cv.string: {
vol.Optional(CONF_NAME): cv.string,
vol.Required(CONF_SENSOR_TYPE): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT, default=None): cv.string,
vol.Optional(CONF_ALIASSES, default=[]):
vol.All(cv.ensure_list, [cv.string]),
},
}),
})
def lookup_unit_for_sensor_type(sensor_type):
"""Get unit for sensor type.
Async friendly.
"""
from rflink.parser import UNITS, PACKET_FIELDS
field_abbrev = {v: k for k, v in PACKET_FIELDS.items()}
return UNITS.get(field_abbrev.get(sensor_type))
def devices_from_config(domain_config, hass=None):
"""Parse config and add rflink sensor devices."""
devices = []
for device_id, config in domain_config[CONF_DEVICES].items():
if not config[ATTR_UNIT_OF_MEASUREMENT]:
config[ATTR_UNIT_OF_MEASUREMENT] = lookup_unit_for_sensor_type(
config[CONF_SENSOR_TYPE])
device = RflinkSensor(device_id, hass, **config)
devices.append(device)
# register entity to listen to incoming rflink events
hass.data[DATA_ENTITY_LOOKUP][
EVENT_KEY_SENSOR][device_id].append(device)
return devices
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the Rflink platform."""
# add devices from config
yield from async_add_devices(devices_from_config(config, hass))
# add new (unconfigured) devices to user desired group
if config[CONF_NEW_DEVICES_GROUP]:
new_devices_group = yield from group.Group.async_create_group(
hass, config[CONF_NEW_DEVICES_GROUP], [], True)
else:
new_devices_group = None
@asyncio.coroutine
def add_new_device(event):
"""Check if device is known, otherwise create device entity."""
device_id = event[EVENT_KEY_ID]
rflinksensor = partial(RflinkSensor, device_id, hass)
device = rflinksensor(event[EVENT_KEY_SENSOR], event[EVENT_KEY_UNIT])
# add device entity
yield from async_add_devices([device])
# register entity to listen to incoming rflink events
hass.data[DATA_ENTITY_LOOKUP][
EVENT_KEY_SENSOR][device_id].append(device)
# make sure the event is processed by the new entity
device.handle_event(event)
# maybe add to new devices group
if new_devices_group:
yield from new_devices_group.async_update_tracked_entity_ids(
list(new_devices_group.tracking) + [device.entity_id])
hass.data[DATA_DEVICE_REGISTER][EVENT_KEY_SENSOR] = add_new_device
class RflinkSensor(RflinkDevice):
"""Representation of a Rflink sensor."""
def __init__(self, device_id, hass, sensor_type,
unit_of_measurement, **kwargs):
"""Handle sensor specific args and super init."""
self._sensor_type = sensor_type
self._unit_of_measurement = unit_of_measurement
super().__init__(device_id, hass, **kwargs)
def _handle_event(self, event):
"""Domain specific event handler."""
self._state = event['value']
@property
def unit_of_measurement(self):
"""Return measurement unit."""
return self._unit_of_measurement
@property
def state(self):
"""Return value."""
return self._state
@property
def icon(self):
"""Return possible sensor specific icon."""
if self._sensor_type in SENSOR_ICONS:
return SENSOR_ICONS[self._sensor_type]

View File

@ -0,0 +1,65 @@
"""Support for Rflink switches.
For more details about this platform, please refer to the documentation
at https://home-assistant.io/components/switch.rflink/
"""
import asyncio
import logging
from homeassistant.components.rflink import (
CONF_ALIASSES, CONF_DEVICE_DEFAULTS, CONF_DEVICES, CONF_FIRE_EVENT,
CONF_SIGNAL_REPETITIONS, DATA_ENTITY_LOOKUP, DEFAULT_SIGNAL_REPETITIONS,
DOMAIN, EVENT_KEY_COMMAND, SwitchableRflinkDevice, cv, vol)
from homeassistant.components.switch import SwitchDevice
from homeassistant.const import CONF_NAME, CONF_PLATFORM
DEPENDENCIES = ['rflink']
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = vol.Schema({
vol.Required(CONF_PLATFORM): DOMAIN,
vol.Optional(CONF_DEVICE_DEFAULTS, default={}): vol.Schema({
vol.Optional(CONF_FIRE_EVENT, default=False): cv.boolean,
vol.Optional(CONF_SIGNAL_REPETITIONS,
default=DEFAULT_SIGNAL_REPETITIONS): vol.Coerce(int),
}),
vol.Optional(CONF_DEVICES, default={}): vol.Schema({
cv.string: {
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_ALIASSES, default=[]):
vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_FIRE_EVENT, default=False): cv.boolean,
vol.Optional(CONF_SIGNAL_REPETITIONS): vol.Coerce(int),
},
}),
})
def devices_from_config(domain_config, hass=None):
"""Parse config and add rflink switch devices."""
devices = []
for device_id, config in domain_config[CONF_DEVICES].items():
device_config = dict(domain_config[CONF_DEVICE_DEFAULTS], **config)
device = RflinkSwitch(device_id, hass, **device_config)
devices.append(device)
# register entity (and aliasses) to listen to incoming rflink events
for _id in config[CONF_ALIASSES] + [device_id]:
hass.data[DATA_ENTITY_LOOKUP][
EVENT_KEY_COMMAND][_id].append(device)
return devices
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Setup the Rflink platform."""
yield from async_add_devices(devices_from_config(config, hass))
class RflinkSwitch(SwitchableRflinkDevice, SwitchDevice):
"""Representation of a Rflink switch."""
pass

View File

@ -560,6 +560,9 @@ pyzabbix==0.7.4
# homeassistant.components.climate.radiotherm
radiotherm==1.2
# homeassistant.components.rflink
rflink==0.0.24
# homeassistant.components.switch.rpi_rf
# rpi-rf==0.9.6

View File

@ -0,0 +1,373 @@
"""Test for RFlink light components.
Test setup of rflink lights component/platform. State tracking and
control of Rflink switch devices.
"""
import asyncio
from homeassistant.components.light import ATTR_BRIGHTNESS
from homeassistant.components.rflink import EVENT_BUTTON_PRESSED
from homeassistant.const import (
ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_TURN_ON)
from homeassistant.core import callback
from ..test_rflink import mock_rflink
DOMAIN = 'light'
CONFIG = {
'rflink': {
'port': '/dev/ttyABC0',
'ignore_devices': ['ignore_wildcard_*', 'ignore_light'],
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'protocol_0_0': {
'name': 'test',
'aliasses': ['test_alias_0_0'],
},
'dimmable_0_0': {
'name': 'dim_test',
'type': 'dimmable',
},
'switchable_0_0': {
'name': 'switch_test',
'type': 'switchable',
}
},
},
}
@asyncio.coroutine
def test_default_setup(hass, monkeypatch):
"""Test all basic functionality of the rflink switch component."""
# setup mocking rflink module
event_callback, create, protocol, _ = yield from mock_rflink(
hass, CONFIG, DOMAIN, monkeypatch)
# make sure arguments are passed
assert create.call_args_list[0][1]['ignore']
# test default state of light loaded from config
light_initial = hass.states.get('light.test')
assert light_initial.state == 'off'
assert light_initial.attributes['assumed_state']
# light should follow state of the hardware device by interpreting
# incoming events for its name and aliasses
# mock incoming command event for this device
event_callback({
'id': 'protocol_0_0',
'command': 'on',
})
yield from hass.async_block_till_done()
light_after_first_command = hass.states.get('light.test')
assert light_after_first_command.state == 'on'
# also after receiving first command state not longer has to be assumed
assert 'assumed_state' not in light_after_first_command.attributes
# mock incoming command event for this device
event_callback({
'id': 'protocol_0_0',
'command': 'off',
})
yield from hass.async_block_till_done()
assert hass.states.get('light.test').state == 'off'
# test following aliasses
# mock incoming command event for this device alias
event_callback({
'id': 'test_alias_0_0',
'command': 'on',
})
yield from hass.async_block_till_done()
assert hass.states.get('light.test').state == 'on'
# test event for new unconfigured sensor
event_callback({
'id': 'protocol2_0_1',
'command': 'on',
})
yield from hass.async_block_till_done()
assert hass.states.get('light.protocol2_0_1').state == 'on'
# test changing state from HA propagates to Rflink
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.test'}))
yield from hass.async_block_till_done()
assert hass.states.get('light.test').state == 'off'
assert protocol.send_command_ack.call_args_list[0][0][0] == 'protocol_0_0'
assert protocol.send_command_ack.call_args_list[0][0][1] == 'off'
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
{ATTR_ENTITY_ID: 'light.test'}))
yield from hass.async_block_till_done()
assert hass.states.get('light.test').state == 'on'
assert protocol.send_command_ack.call_args_list[1][0][1] == 'on'
# protocols supporting dimming and on/off should create hybrid light entity
event_callback({
'id': 'newkaku_0_1',
'command': 'off',
})
yield from hass.async_block_till_done()
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
{ATTR_ENTITY_ID: 'light.newkaku_0_1'}))
yield from hass.async_block_till_done()
# dimmable should send highest dim level when turning on
assert protocol.send_command_ack.call_args_list[2][0][1] == '15'
# and send on command for fallback
assert protocol.send_command_ack.call_args_list[3][0][1] == 'on'
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
{
ATTR_ENTITY_ID: 'light.newkaku_0_1',
ATTR_BRIGHTNESS: 128,
}))
yield from hass.async_block_till_done()
assert protocol.send_command_ack.call_args_list[4][0][1] == '7'
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
{
ATTR_ENTITY_ID: 'light.dim_test',
ATTR_BRIGHTNESS: 128,
}))
yield from hass.async_block_till_done()
assert protocol.send_command_ack.call_args_list[5][0][1] == '7'
@asyncio.coroutine
def test_new_light_group(hass, monkeypatch):
"""New devices should be added to configured group."""
config = {
'rflink': {
'port': '/dev/ttyABC0',
},
DOMAIN: {
'platform': 'rflink',
'new_devices_group': 'new_rflink_lights',
},
}
# setup mocking rflink module
event_callback, _, _, _ = yield from mock_rflink(
hass, config, DOMAIN, monkeypatch)
# test event for new unconfigured sensor
event_callback({
'id': 'protocol_0_0',
'command': 'off',
})
yield from hass.async_block_till_done()
# make sure new device is added to correct group
group = hass.states.get('group.new_rflink_lights')
assert group.attributes.get('entity_id') == ('light.protocol_0_0',)
@asyncio.coroutine
def test_firing_bus_event(hass, monkeypatch):
"""Incoming Rflink command events should be put on the HA event bus."""
config = {
'rflink': {
'port': '/dev/ttyABC0',
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'protocol_0_0': {
'name': 'test',
'aliasses': ['test_alias_0_0'],
'fire_event': True,
},
},
},
}
# setup mocking rflink module
event_callback, _, _, _ = yield from mock_rflink(
hass, config, DOMAIN, monkeypatch)
calls = []
@callback
def listener(event):
calls.append(event)
hass.bus.async_listen_once(EVENT_BUTTON_PRESSED, listener)
# test event for new unconfigured sensor
event_callback({
'id': 'protocol_0_0',
'command': 'off',
})
yield from hass.async_block_till_done()
assert calls[0].data == {'state': 'off', 'entity_id': 'light.test'}
@asyncio.coroutine
def test_signal_repetitions(hass, monkeypatch):
"""Command should be sent amount of configured repetitions."""
config = {
'rflink': {
'port': '/dev/ttyABC0',
},
DOMAIN: {
'platform': 'rflink',
'device_defaults': {
'signal_repetitions': 3,
},
'devices': {
'protocol_0_0': {
'name': 'test',
'signal_repetitions': 2,
},
'protocol_0_1': {
'name': 'test1',
},
'newkaku_0_1': {
'type': 'hybrid',
}
},
},
}
# setup mocking rflink module
event_callback, _, protocol, _ = yield from mock_rflink(
hass, config, DOMAIN, monkeypatch)
# test if signal repetition is performed according to configuration
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.test'}))
# wait for commands and repetitions to finish
yield from hass.async_block_till_done()
assert protocol.send_command_ack.call_count == 2
# test if default apply to configured devcies
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.test1'}))
# wait for commands and repetitions to finish
yield from hass.async_block_till_done()
assert protocol.send_command_ack.call_count == 5
# test if device defaults apply to newly created devices
event_callback({
'id': 'protocol_0_2',
'command': 'off',
})
# make sure entity is created before setting state
yield from hass.async_block_till_done()
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.protocol_0_2'}))
# wait for commands and repetitions to finish
yield from hass.async_block_till_done()
assert protocol.send_command_ack.call_count == 8
@asyncio.coroutine
def test_signal_repetitions_alternation(hass, monkeypatch):
"""Simultaneously switching entities must alternate repetitions."""
config = {
'rflink': {
'port': '/dev/ttyABC0',
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'protocol_0_0': {
'name': 'test',
'signal_repetitions': 2,
},
'protocol_0_1': {
'name': 'test1',
'signal_repetitions': 2,
},
},
},
}
# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
hass, config, DOMAIN, monkeypatch)
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.test'}))
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.test1'}))
yield from hass.async_block_till_done()
assert protocol.send_command_ack.call_args_list[0][0][0] == 'protocol_0_0'
assert protocol.send_command_ack.call_args_list[1][0][0] == 'protocol_0_1'
assert protocol.send_command_ack.call_args_list[2][0][0] == 'protocol_0_0'
assert protocol.send_command_ack.call_args_list[3][0][0] == 'protocol_0_1'
@asyncio.coroutine
def test_signal_repetitions_cancelling(hass, monkeypatch):
"""Cancel outstanding repetitions when state changed."""
config = {
'rflink': {
'port': '/dev/ttyABC0',
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'protocol_0_0': {
'name': 'test',
'signal_repetitions': 3,
},
},
},
}
# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
hass, config, DOMAIN, monkeypatch)
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'light.test'}))
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
{ATTR_ENTITY_ID: 'light.test'}))
yield from hass.async_block_till_done()
print(protocol.send_command_ack.call_args_list)
assert protocol.send_command_ack.call_args_list[0][0][1] == 'off'
assert protocol.send_command_ack.call_args_list[1][0][1] == 'on'
assert protocol.send_command_ack.call_args_list[2][0][1] == 'on'
assert protocol.send_command_ack.call_args_list[3][0][1] == 'on'

View File

@ -0,0 +1,103 @@
"""Test for RFlink sensor components.
Test setup of rflink sensor component/platform. Verify manual and
automatic sensor creation.
"""
import asyncio
from ..test_rflink import mock_rflink
DOMAIN = 'sensor'
CONFIG = {
'rflink': {
'port': '/dev/ttyABC0',
'ignore_devices': ['ignore_wildcard_*', 'ignore_sensor'],
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'test': {
'name': 'test',
'sensor_type': 'temperature',
},
},
},
}
@asyncio.coroutine
def test_default_setup(hass, monkeypatch):
"""Test all basic functionality of the rflink sensor component."""
# setup mocking rflink module
event_callback, create, _, _ = yield from mock_rflink(
hass, CONFIG, DOMAIN, monkeypatch)
# make sure arguments are passed
assert create.call_args_list[0][1]['ignore']
# test default state of sensor loaded from config
config_sensor = hass.states.get('sensor.test')
assert config_sensor
assert config_sensor.state == 'unknown'
assert config_sensor.attributes['unit_of_measurement'] == '°C'
# test event for config sensor
event_callback({
'id': 'test',
'sensor': 'temperature',
'value': 1,
'unit': '°C',
})
yield from hass.async_block_till_done()
assert hass.states.get('sensor.test').state == '1'
# test event for new unconfigured sensor
event_callback({
'id': 'test2',
'sensor': 'temperature',
'value': 0,
'unit': '°C',
})
yield from hass.async_block_till_done()
# test state of new sensor
new_sensor = hass.states.get('sensor.test2')
assert new_sensor
assert new_sensor.state == '0'
assert new_sensor.attributes['unit_of_measurement'] == '°C'
assert new_sensor.attributes['icon'] == 'mdi:thermometer'
@asyncio.coroutine
def test_new_sensors_group(hass, monkeypatch):
"""New devices should be added to configured group."""
config = {
'rflink': {
'port': '/dev/ttyABC0',
},
DOMAIN: {
'platform': 'rflink',
'new_devices_group': 'new_rflink_sensors',
},
}
# setup mocking rflink module
event_callback, _, _, _ = yield from mock_rflink(
hass, config, DOMAIN, monkeypatch)
# test event for new unconfigured sensor
event_callback({
'id': 'test',
'sensor': 'temperature',
'value': 0,
'unit': '°C',
})
yield from hass.async_block_till_done()
# make sure new device is added to correct group
group = hass.states.get('group.new_rflink_sensors')
assert group.attributes.get('entity_id') == ('sensor.test',)

View File

@ -0,0 +1,100 @@
"""Test for RFlink switch components.
Test setup of rflink switch component/platform. State tracking and
control of Rflink switch devices.
"""
import asyncio
from homeassistant.const import (
ATTR_ENTITY_ID, SERVICE_TURN_OFF, SERVICE_TURN_ON)
from ..test_rflink import mock_rflink
DOMAIN = 'switch'
CONFIG = {
'rflink': {
'port': '/dev/ttyABC0',
'ignore_devices': ['ignore_wildcard_*', 'ignore_sensor'],
},
DOMAIN: {
'platform': 'rflink',
'devices': {
'protocol_0_0': {
'name': 'test',
'aliasses': ['test_alias_0_0'],
},
},
},
}
@asyncio.coroutine
def test_default_setup(hass, monkeypatch):
"""Test all basic functionality of the rflink switch component."""
# setup mocking rflink module
event_callback, create, protocol, _ = yield from mock_rflink(
hass, CONFIG, DOMAIN, monkeypatch)
# make sure arguments are passed
assert create.call_args_list[0][1]['ignore']
# test default state of switch loaded from config
switch_initial = hass.states.get('switch.test')
assert switch_initial.state == 'off'
assert switch_initial.attributes['assumed_state']
# switch should follow state of the hardware device by interpreting
# incoming events for its name and aliasses
# mock incoming command event for this device
event_callback({
'id': 'protocol_0_0',
'command': 'on',
})
yield from hass.async_block_till_done()
switch_after_first_command = hass.states.get('switch.test')
assert switch_after_first_command.state == 'on'
# also after receiving first command state not longer has to be assumed
assert 'assumed_state' not in switch_after_first_command.attributes
# mock incoming command event for this device
event_callback({
'id': 'protocol_0_0',
'command': 'off',
})
yield from hass.async_block_till_done()
assert hass.states.get('switch.test').state == 'off'
# test following aliasses
# mock incoming command event for this device alias
event_callback({
'id': 'test_alias_0_0',
'command': 'on',
})
yield from hass.async_block_till_done()
assert hass.states.get('switch.test').state == 'on'
# The switch component does not support adding new devices for incoming
# events because every new unkown device is added as a light by default.
# test changing state from HA propagates to Rflink
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'switch.test'}))
yield from hass.async_block_till_done()
assert hass.states.get('switch.test').state == 'off'
assert protocol.send_command_ack.call_args_list[0][0][0] == 'protocol_0_0'
assert protocol.send_command_ack.call_args_list[0][0][1] == 'off'
hass.async_add_job(
hass.services.async_call(DOMAIN, SERVICE_TURN_ON,
{ATTR_ENTITY_ID: 'switch.test'}))
yield from hass.async_block_till_done()
assert hass.states.get('switch.test').state == 'on'
assert protocol.send_command_ack.call_args_list[1][0][1] == 'on'

View File

@ -0,0 +1,178 @@
"""Common functions for Rflink component tests and generic platform tests."""
import asyncio
from unittest.mock import Mock
from homeassistant.bootstrap import async_setup_component
from homeassistant.components.rflink import CONF_RECONNECT_INTERVAL
from homeassistant.const import ATTR_ENTITY_ID, SERVICE_TURN_OFF
from tests.common import assert_setup_component
@asyncio.coroutine
def mock_rflink(hass, config, domain, monkeypatch, failures=None):
"""Create mock Rflink asyncio protocol, test component setup."""
transport, protocol = (Mock(), Mock())
@asyncio.coroutine
def send_command_ack(*command):
return True
protocol.send_command_ack = Mock(wraps=send_command_ack)
@asyncio.coroutine
def send_command(*command):
return True
protocol.send_command = Mock(wraps=send_command)
@asyncio.coroutine
def create_rflink_connection(*args, **kwargs):
"""Return mocked transport and protocol."""
# failures can be a list of booleans indicating in which sequence
# creating a connection should success or fail
if failures:
fail = failures.pop()
else:
fail = False
if fail:
raise ConnectionRefusedError
else:
return transport, protocol
mock_create = Mock(wraps=create_rflink_connection)
monkeypatch.setattr(
'rflink.protocol.create_rflink_connection',
mock_create)
# verify instanstiation of component with given config
with assert_setup_component(1, domain):
yield from async_setup_component(hass, domain, config)
# hook into mock config for injecting events
event_callback = mock_create.call_args_list[0][1]['event_callback']
assert event_callback
disconnect_callback = mock_create.call_args_list[
0][1]['disconnect_callback']
return event_callback, mock_create, protocol, disconnect_callback
@asyncio.coroutine
def test_version_banner(hass, monkeypatch):
"""Test sending unknown commands doesn't cause issues."""
# use sensor domain during testing main platform
domain = 'sensor'
config = {
'rflink': {'port': '/dev/ttyABC0', },
domain: {
'platform': 'rflink',
'devices': {
'test': {'name': 'test', 'sensor_type': 'temperature', },
},
},
}
# setup mocking rflink module
event_callback, _, _, _ = yield from mock_rflink(
hass, config, domain, monkeypatch)
event_callback({
'hardware': 'Nodo RadioFrequencyLink',
'firmware': 'RFLink Gateway',
'version': '1.1',
'revision': '45',
})
@asyncio.coroutine
def test_send_no_wait(hass, monkeypatch):
"""Test command sending without ack."""
domain = 'switch'
config = {
'rflink': {
'port': '/dev/ttyABC0',
'wait_for_ack': False,
},
domain: {
'platform': 'rflink',
'devices': {
'protocol_0_0': {
'name': 'test',
'aliasses': ['test_alias_0_0'],
},
},
},
}
# setup mocking rflink module
_, _, protocol, _ = yield from mock_rflink(
hass, config, domain, monkeypatch)
hass.async_add_job(
hass.services.async_call(domain, SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: 'switch.test'}))
yield from hass.async_block_till_done()
assert protocol.send_command.call_args_list[0][0][0] == 'protocol_0_0'
assert protocol.send_command.call_args_list[0][0][1] == 'off'
@asyncio.coroutine
def test_reconnecting_after_disconnect(hass, monkeypatch):
"""An unexpected disconnect should cause a reconnect."""
domain = 'sensor'
config = {
'rflink': {
'port': '/dev/ttyABC0',
CONF_RECONNECT_INTERVAL: 0,
},
domain: {
'platform': 'rflink',
},
}
# setup mocking rflink module
_, mock_create, _, disconnect_callback = yield from mock_rflink(
hass, config, domain, monkeypatch)
assert disconnect_callback, 'disconnect callback not passed to rflink'
# rflink initiated disconnect
disconnect_callback(None)
yield from hass.async_block_till_done()
# we expect 2 call, the initial and reconnect
assert mock_create.call_count == 2
@asyncio.coroutine
def test_reconnecting_after_failure(hass, monkeypatch):
"""A failure to reconnect should be retried."""
domain = 'sensor'
config = {
'rflink': {
'port': '/dev/ttyABC0',
CONF_RECONNECT_INTERVAL: 0,
},
domain: {
'platform': 'rflink',
},
}
# success first time but fail second
failures = [False, True, False]
# setup mocking rflink module
_, mock_create, _, disconnect_callback = yield from mock_rflink(
hass, config, domain, monkeypatch, failures=failures)
# rflink initiated disconnect
disconnect_callback(None)
# wait for reconnects to have happened
yield from hass.async_block_till_done()
yield from hass.async_block_till_done()
# we expect 3 calls, the initial and 2 reconnects
assert mock_create.call_count == 3