Combine garage_door and rollershutter to cover (#2891)

* First draft for cover component

* Efficiency from @martinhjelmare

* migrate demo

* migrate demo test

* migrate command_line rollershutter

* migrate command_line test

* migrate rpi_gpio garage_door

* make some abstract methods optional

* migrate homematic

* migrate scsgate

* migrate rfxtrx and test

* migrate zwave

* migrate wink

* migrate mqtt rollershutter and test

* requirements

* coverage

* Update mqtt with garage door

* Naming and cleanup

* update test_demo.py

* update demo and core

* Add deprecated warning to rollershutter and garage_door

* Naming again

* Update

* String constants

* Make sure set_position works properly in demo too

* Make sure position is not set if not available.

* Naming, and is_closed

* Update zwave.py

* requirements

* Update test_rfxtrx.py

* fix mqtt

* requirements

* fix wink version

* Fixed demo test

* naming
This commit is contained in:
John Arild Berentsen 2016-08-24 03:23:18 +02:00 committed by Paulus Schoutsen
parent a43ea81d8e
commit cf832499cd
21 changed files with 2006 additions and 11 deletions

View File

@ -114,6 +114,10 @@ omit =
homeassistant/components/climate/knx.py
homeassistant/components/climate/proliphix.py
homeassistant/components/climate/radiotherm.py
homeassistant/components/cover/homematic.py
homeassistant/components/cover/rpi_gpio.py
homeassistant/components/cover/scsgate.py
homeassistant/components/cover/wink.py
homeassistant/components/device_tracker/actiontec.py
homeassistant/components/device_tracker/aruba.py
homeassistant/components/device_tracker/asuswrt.py

View File

@ -0,0 +1,234 @@
"""
Support for Cover devices.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover/
"""
import os
import logging
import voluptuous as vol
from homeassistant.config import load_yaml_config_file
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.config_validation import PLATFORM_SCHEMA # noqa
import homeassistant.helpers.config_validation as cv
from homeassistant.components import group
from homeassistant.const import (
SERVICE_OPEN_COVER, SERVICE_CLOSE_COVER, SERVICE_SET_COVER_POSITION,
SERVICE_STOP_COVER, SERVICE_OPEN_COVER_TILT, SERVICE_CLOSE_COVER_TILT,
SERVICE_STOP_COVER_TILT, SERVICE_SET_COVER_TILT_POSITION, STATE_OPEN,
STATE_CLOSED, STATE_UNKNOWN, ATTR_ENTITY_ID)
DOMAIN = 'cover'
SCAN_INTERVAL = 15
GROUP_NAME_ALL_COVERS = 'all_covers'
ENTITY_ID_ALL_COVERS = group.ENTITY_ID_FORMAT.format(
GROUP_NAME_ALL_COVERS)
ENTITY_ID_FORMAT = DOMAIN + '.{}'
_LOGGER = logging.getLogger(__name__)
ATTR_CURRENT_POSITION = 'current_position'
ATTR_CURRENT_TILT_POSITION = 'current_tilt_position'
ATTR_POSITION = 'position'
ATTR_TILT_POSITION = 'tilt_position'
COVER_SERVICE_SCHEMA = vol.Schema({
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
})
COVER_SET_COVER_POSITION_SCHEMA = COVER_SERVICE_SCHEMA.extend({
vol.Required(ATTR_POSITION):
vol.All(vol.Coerce(int), vol.Range(min=0, max=100)),
})
COVER_SET_COVER_TILT_POSITION_SCHEMA = COVER_SERVICE_SCHEMA.extend({
vol.Required(ATTR_TILT_POSITION):
vol.All(vol.Coerce(int), vol.Range(min=0, max=100)),
})
SERVICE_TO_METHOD = {
SERVICE_OPEN_COVER: {'method': 'open_cover'},
SERVICE_CLOSE_COVER: {'method': 'close_cover'},
SERVICE_SET_COVER_POSITION: {
'method': 'set_cover_position',
'schema': COVER_SET_COVER_POSITION_SCHEMA},
SERVICE_STOP_COVER: {'method': 'stop_cover'},
SERVICE_OPEN_COVER_TILT: {'method': 'open_cover_tilt'},
SERVICE_CLOSE_COVER_TILT: {'method': 'close_cover_tilt'},
SERVICE_SET_COVER_TILT_POSITION: {
'method': 'set_cover_tilt_position',
'schema': COVER_SET_COVER_TILT_POSITION_SCHEMA},
}
def is_closed(hass, entity_id=None):
"""Return if the cover is closed based on the statemachine."""
entity_id = entity_id or ENTITY_ID_ALL_COVERS
return hass.states.is_state(entity_id, STATE_CLOSED)
def open_cover(hass, entity_id=None):
"""Open all or specified cover."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_OPEN_COVER, data)
def close_cover(hass, entity_id=None):
"""Close all or specified cover."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_CLOSE_COVER, data)
def set_cover_position(hass, position, entity_id=None):
"""Move to specific position all or specified cover."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
data[ATTR_POSITION] = position
hass.services.call(DOMAIN, SERVICE_SET_COVER_POSITION, data)
def stop_cover(hass, entity_id=None):
"""Stop all or specified cover."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_STOP_COVER, data)
def open_cover_tilt(hass, entity_id=None):
"""Open all or specified cover tilt."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_OPEN_COVER_TILT, data)
def close_cover_tilt(hass, entity_id=None):
"""Close all or specified cover tilt."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_CLOSE_COVER_TILT, data)
def set_cover_tilt_position(hass, tilt_position, entity_id=None):
"""Move to specific tilt position all or specified cover."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
data[ATTR_TILT_POSITION] = tilt_position
hass.services.call(DOMAIN, SERVICE_SET_COVER_TILT_POSITION, data)
def stop_cover_tilt(hass, entity_id=None):
"""Stop all or specified cover tilt."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
hass.services.call(DOMAIN, SERVICE_STOP_COVER_TILT, data)
def setup(hass, config):
"""Track states and offer events for covers."""
component = EntityComponent(
_LOGGER, DOMAIN, hass, SCAN_INTERVAL, GROUP_NAME_ALL_COVERS)
component.setup(config)
def handle_cover_service(service):
"""Handle calls to the cover services."""
method = SERVICE_TO_METHOD.get(service.service)
params = service.data.copy()
params.pop(ATTR_ENTITY_ID, None)
if method:
for cover in component.extract_from_service(service):
getattr(cover, method['method'])(**params)
if cover.should_poll:
cover.update_ha_state(True)
descriptions = load_yaml_config_file(
os.path.join(os.path.dirname(__file__), 'services.yaml'))
for service_name in SERVICE_TO_METHOD:
schema = SERVICE_TO_METHOD[service_name].get(
'schema', COVER_SERVICE_SCHEMA)
hass.services.register(DOMAIN, service_name, handle_cover_service,
descriptions.get(service_name), schema=schema)
return True
class CoverDevice(Entity):
"""Representation a cover."""
# pylint: disable=no-self-use
@property
def current_cover_position(self):
"""Return current position of cover.
None is unknown, 0 is closed, 100 is fully open.
"""
return None
@property
def current_cover_tilt_position(self):
"""Return current position of cover tilt.
None is unknown, 0 is closed, 100 is fully open.
"""
return None
@property
def state(self):
"""Return the state of the cover."""
closed = self.is_closed
if closed is None:
return STATE_UNKNOWN
return STATE_CLOSED if closed else STATE_OPEN
@property
def state_attributes(self):
"""Return the state attributes."""
data = {
ATTR_CURRENT_POSITION: self.current_cover_position
}
current_tilt = self.current_cover_tilt_position
if current_tilt is not None:
data[ATTR_CURRENT_TILT_POSITION] = self.current_cover_tilt_position
return data
@property
def is_closed(self):
"""Return if the cover is closed or not."""
return NotImplementedError()
def open_cover(self, **kwargs):
"""Open the cover."""
raise NotImplementedError()
def close_cover(self, **kwargs):
"""Close cover."""
raise NotImplementedError()
def set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
pass
def stop_cover(self, **kwargs):
"""Stop the cover."""
pass
def open_cover_tilt(self, **kwargs):
"""Open the cover tilt."""
pass
def close_cover_tilt(self, **kwargs):
"""Close the cover tilt."""
pass
def set_cover_tilt_position(self, **kwargs):
"""Move the cover tilt to a specific position."""
pass
def stop_cover_tilt(self, **kwargs):
"""Stop the cover."""
pass

View File

@ -0,0 +1,128 @@
"""
Support for command line covers.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.command_line/
"""
import logging
import subprocess
from homeassistant.components.cover import CoverDevice
from homeassistant.const import CONF_VALUE_TEMPLATE
from homeassistant.helpers import template
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Setup cover controlled by shell commands."""
covers = config.get('covers', {})
devices = []
for dev_name, properties in covers.items():
devices.append(
CommandCover(
hass,
properties.get('name', dev_name),
properties.get('opencmd', 'true'),
properties.get('closecmd', 'true'),
properties.get('stopcmd', 'true'),
properties.get('statecmd', False),
properties.get(CONF_VALUE_TEMPLATE, '{{ value }}')))
add_devices_callback(devices)
# pylint: disable=too-many-arguments, too-many-instance-attributes
class CommandCover(CoverDevice):
"""Representation a command line cover."""
# pylint: disable=too-many-arguments
def __init__(self, hass, name, command_open, command_close, command_stop,
command_state, value_template):
"""Initialize the cover."""
self._hass = hass
self._name = name
self._state = None
self._command_open = command_open
self._command_close = command_close
self._command_stop = command_stop
self._command_state = command_state
self._value_template = value_template
@staticmethod
def _move_cover(command):
"""Execute the actual commands."""
_LOGGER.info('Running command: %s', command)
success = (subprocess.call(command, shell=True) == 0)
if not success:
_LOGGER.error('Command failed: %s', command)
return success
@staticmethod
def _query_state_value(command):
"""Execute state command for return value."""
_LOGGER.info('Running state command: %s', command)
try:
return_value = subprocess.check_output(command, shell=True)
return return_value.strip().decode('utf-8')
except subprocess.CalledProcessError:
_LOGGER.error('Command failed: %s', command)
@property
def should_poll(self):
"""Only poll if we have state command."""
return self._command_state is not None
@property
def name(self):
"""Return the name of the cover."""
return self._name
@property
def is_closed(self):
"""Return if the cover is closed."""
if self.current_cover_position is not None:
if self.current_cover_position > 0:
return False
else:
return True
@property
def current_cover_position(self):
"""Return current position of cover.
None is unknown, 0 is closed, 100 is fully open.
"""
return self._state
def _query_state(self):
"""Query for the state."""
if not self._command_state:
_LOGGER.error('No state command specified')
return
return self._query_state_value(self._command_state)
def update(self):
"""Update device state."""
if self._command_state:
payload = str(self._query_state())
if self._value_template:
payload = template.render_with_possible_json_value(
self._hass, self._value_template, payload)
self._state = int(payload)
def open_cover(self, **kwargs):
"""Open the cover."""
self._move_cover(self._command_open)
def close_cover(self, **kwargs):
"""Close the cover."""
self._move_cover(self._command_close)
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._move_cover(self._command_stop)

View File

@ -0,0 +1,155 @@
"""
Demo platform for the cover component.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/demo/
"""
from homeassistant.components.cover import CoverDevice
from homeassistant.const import EVENT_TIME_CHANGED
from homeassistant.helpers.event import track_utc_time_change
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup the Demo covers."""
add_devices([
DemoCover(hass, 'Kitchen Window'),
DemoCover(hass, 'Hall Window', 10),
DemoCover(hass, 'Living Room Window', 70, 50),
])
class DemoCover(CoverDevice):
"""Representation of a demo cover."""
# pylint: disable=no-self-use, too-many-instance-attributes
def __init__(self, hass, name, position=None, tilt_position=None):
"""Initialize the cover."""
self.hass = hass
self._name = name
self._position = position
self._set_position = None
self._set_tilt_position = None
self._tilt_position = tilt_position
self._closing = True
self._closing_tilt = True
self._listener_cover = None
self._listener_cover_tilt = None
@property
def name(self):
"""Return the name of the cover."""
return self._name
@property
def should_poll(self):
"""No polling needed for a demo cover."""
return False
@property
def current_cover_position(self):
"""Return the current position of the cover."""
return self._position
@property
def current_cover_tilt_position(self):
"""Return the current tilt position of the cover."""
return self._tilt_position
def close_cover(self, **kwargs):
"""Close the cover."""
if self._position == 0:
return
self._listen_cover()
self._closing = True
def close_cover_tilt(self, **kwargs):
"""Close the cover tilt."""
if self._tilt_position == 0:
return
self._listen_cover_tilt()
self._closing_tilt = True
def open_cover(self, **kwargs):
"""Open the cover."""
if self._position == 100:
return
self._listen_cover()
self._closing = False
def open_cover_tilt(self, **kwargs):
"""Open the cover tilt."""
if self._tilt_position == 100:
return
self._listen_cover_tilt()
self._closing_tilt = False
def set_cover_position(self, position, **kwargs):
"""Move the cover to a specific position."""
self._set_position = round(position, -1)
if self._position == position:
return
self._listen_cover()
self._closing = position < self._position
def set_cover_tilt_position(self, tilt_position, **kwargs):
"""Move the cover til to a specific position."""
self._set_tilt_position = round(tilt_position, -1)
if self._tilt_position == tilt_position:
return
self._listen_cover_tilt()
self._closing_tilt = tilt_position < self._tilt_position
def stop_cover(self, **kwargs):
"""Stop the cover."""
if self._listener_cover is not None:
self.hass.bus.remove_listener(EVENT_TIME_CHANGED,
self._listener_cover)
self._listener_cover = None
self._set_position = None
def stop_cover_tilt(self, **kwargs):
"""Stop the cover tilt."""
if self._listener_cover_tilt is not None:
self.hass.bus.remove_listener(EVENT_TIME_CHANGED,
self._listener_cover_tilt)
self._listener_cover_tilt = None
self._set_tilt_position = None
def _listen_cover(self):
"""Listen for changes in cover."""
if self._listener_cover is None:
self._listener_cover = track_utc_time_change(
self.hass, self._time_changed_cover)
def _time_changed_cover(self, now):
"""Track time changes."""
if self._closing:
self._position -= 10
else:
self._position += 10
if self._position in (100, 0, self._set_position):
self.stop_cover()
self.update_ha_state()
def _listen_cover_tilt(self):
"""Listen for changes in cover tilt."""
if self._listener_cover_tilt is None:
self._listener_cover_tilt = track_utc_time_change(
self.hass, self._time_changed_cover_tilt)
def _time_changed_cover_tilt(self, now):
"""Track time changes."""
if self._closing_tilt:
self._tilt_position -= 10
else:
self._tilt_position += 10
if self._tilt_position in (100, 0, self._set_tilt_position):
self.stop_cover_tilt()
self.update_ha_state()

View File

@ -0,0 +1,101 @@
"""
The homematic cover platform.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.homematic/
Important: For this platform to work the homematic component has to be
properly configured.
"""
import logging
from homeassistant.const import STATE_UNKNOWN
from homeassistant.components.cover import CoverDevice,\
ATTR_CURRENT_POSITION
import homeassistant.components.homematic as homematic
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['homematic']
def setup_platform(hass, config, add_callback_devices, discovery_info=None):
"""Setup the platform."""
if discovery_info is None:
return
return homematic.setup_hmdevice_discovery_helper(HMCover,
discovery_info,
add_callback_devices)
# pylint: disable=abstract-method
class HMCover(homematic.HMDevice, CoverDevice):
"""Represents a Homematic Cover in Home Assistant."""
@property
def current_cover_position(self):
"""
Return current position of cover.
None is unknown, 0 is closed, 100 is fully open.
"""
if self.available:
return int((1 - self._hm_get_state()) * 100)
return None
def set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
if self.available:
if ATTR_CURRENT_POSITION in kwargs:
position = float(kwargs[ATTR_CURRENT_POSITION])
position = min(100, max(0, position))
level = (100 - position) / 100.0
self._hmdevice.set_level(level, self._channel)
@property
def is_closed(self):
"""Return if the cover is closed."""
if self.current_cover_position is not None:
if self.current_cover_position > 0:
return False
else:
return True
def open_cover(self, **kwargs):
"""Open the cover."""
if self.available:
self._hmdevice.move_up(self._channel)
def close_cover(self, **kwargs):
"""Close the cover."""
if self.available:
self._hmdevice.move_down(self._channel)
def stop_cover(self, **kwargs):
"""Stop the device if in motion."""
if self.available:
self._hmdevice.stop(self._channel)
def _check_hm_to_ha_object(self):
"""Check if possible to use the HM Object as this HA type."""
from pyhomematic.devicetypes.actors import Blind
# Check compatibility from HMDevice
if not super()._check_hm_to_ha_object():
return False
# Check if the homematic device is correct for this HA device
if isinstance(self._hmdevice, Blind):
return True
_LOGGER.critical("This %s can't be use as cover!", self._name)
return False
def _init_data_struct(self):
"""Generate a data dict (self._data) from hm metadata."""
super()._init_data_struct()
# Add state to data dict
self._state = "LEVEL"
self._data.update({self._state: STATE_UNKNOWN})

View File

@ -0,0 +1,167 @@
"""
Support for MQTT cover devices.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.mqtt/
"""
import logging
import voluptuous as vol
import homeassistant.components.mqtt as mqtt
from homeassistant.components.cover import CoverDevice
from homeassistant.const import (
CONF_NAME, CONF_VALUE_TEMPLATE, CONF_OPTIMISTIC, STATE_OPEN,
STATE_CLOSED)
from homeassistant.components.mqtt import (
CONF_STATE_TOPIC, CONF_COMMAND_TOPIC, CONF_QOS, CONF_RETAIN)
from homeassistant.helpers import template
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['mqtt']
CONF_PAYLOAD_OPEN = 'payload_open'
CONF_PAYLOAD_CLOSE = 'payload_close'
CONF_PAYLOAD_STOP = 'payload_stop'
CONF_STATE_OPEN = 'state_open'
CONF_STATE_CLOSED = 'state_closed'
DEFAULT_NAME = "MQTT Cover"
DEFAULT_PAYLOAD_OPEN = "OPEN"
DEFAULT_PAYLOAD_CLOSE = "CLOSE"
DEFAULT_PAYLOAD_STOP = "STOP"
DEFAULT_OPTIMISTIC = False
DEFAULT_RETAIN = False
PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PAYLOAD_OPEN, default=DEFAULT_PAYLOAD_OPEN): cv.string,
vol.Optional(CONF_PAYLOAD_CLOSE, default=DEFAULT_PAYLOAD_CLOSE): cv.string,
vol.Optional(CONF_PAYLOAD_STOP, default=DEFAULT_PAYLOAD_STOP): cv.string,
vol.Optional(CONF_STATE_OPEN, default=STATE_OPEN): cv.string,
vol.Optional(CONF_STATE_CLOSED, default=STATE_CLOSED): cv.string,
vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
vol.Optional(CONF_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
})
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Add MQTT Cover."""
add_devices_callback([MqttCover(
hass,
config[CONF_NAME],
config.get(CONF_STATE_TOPIC),
config[CONF_COMMAND_TOPIC],
config[CONF_QOS],
config[CONF_RETAIN],
config[CONF_STATE_OPEN],
config[CONF_STATE_CLOSED],
config[CONF_PAYLOAD_OPEN],
config[CONF_PAYLOAD_CLOSE],
config[CONF_PAYLOAD_STOP],
config[CONF_OPTIMISTIC],
config.get(CONF_VALUE_TEMPLATE)
)])
# pylint: disable=too-many-arguments, too-many-instance-attributes
class MqttCover(CoverDevice):
"""Representation of a cover that can be controlled using MQTT."""
def __init__(self, hass, name, state_topic, command_topic, qos,
retain, state_open, state_closed, payload_open, payload_close,
payload_stop, optimistic, value_template):
"""Initialize the cover."""
self._position = None
self._state = None
self._hass = hass
self._name = name
self._state_topic = state_topic
self._command_topic = command_topic
self._qos = qos
self._payload_open = payload_open
self._payload_close = payload_close
self._payload_stop = payload_stop
self._state_open = state_open
self._state_closed = state_closed
self._retain = retain
self._optimistic = optimistic or state_topic is None
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
if value_template is not None:
payload = template.render_with_possible_json_value(
hass, value_template, payload)
if payload == self._state_open:
self._state = False
self.update_ha_state()
elif payload == self._state_closed:
self._state = True
self.update_ha_state()
elif payload.isnumeric() and 0 <= int(payload) <= 100:
self._state = int(payload)
self._position = int(payload)
self.update_ha_state()
else:
_LOGGER.warning(
"Payload is not True or False or"
" integer(0-100) %s", payload)
if self._state_topic is None:
# Force into optimistic mode.
self._optimistic = True
else:
mqtt.subscribe(hass, self._state_topic, message_received,
self._qos)
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def name(self):
"""Return the name of the cover."""
return self._name
@property
def is_closed(self):
"""Return if the cover is closed."""
if self.current_cover_position is not None:
if self.current_cover_position > 0:
return False
else:
return True
@property
def current_cover_position(self):
"""Return current position of cover.
None is unknown, 0 is closed, 100 is fully open.
"""
return self._position
def open_cover(self, **kwargs):
"""Move the cover up."""
mqtt.publish(self.hass, self._command_topic, self._payload_open,
self._qos, self._retain)
if self._optimistic:
# Optimistically assume that cover has changed state.
self._state = 100
self.update_ha_state()
def close_cover(self, **kwargs):
"""Move the cover down."""
mqtt.publish(self.hass, self._command_topic, self._payload_close,
self._qos, self._retain)
if self._optimistic:
# Optimistically assume that cover has changed state.
self._state = 0
self.update_ha_state()
def stop_cover(self, **kwargs):
"""Stop the device."""
mqtt.publish(self.hass, self._command_topic, self._payload_stop,
self._qos, self._retain)

View File

@ -0,0 +1,67 @@
"""
Support for RFXtrx cover components.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/cover.rfxtrx/
"""
import homeassistant.components.rfxtrx as rfxtrx
from homeassistant.components.cover import CoverDevice
DEPENDENCIES = ['rfxtrx']
PLATFORM_SCHEMA = rfxtrx.DEFAULT_SCHEMA
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Setup the RFXtrx cover."""
import RFXtrx as rfxtrxmod
# Add cover from config file
covers = rfxtrx.get_devices_from_config(config,
RfxtrxCover)
add_devices_callback(covers)
def cover_update(event):
"""Callback for cover updates from the RFXtrx gateway."""
if not isinstance(event.device, rfxtrxmod.LightingDevice) or \
event.device.known_to_be_dimmable or \
not event.device.known_to_be_rollershutter:
return
new_device = rfxtrx.get_new_device(event, config, RfxtrxCover)
if new_device:
add_devices_callback([new_device])
rfxtrx.apply_received_command(event)
# Subscribe to main rfxtrx events
if cover_update not in rfxtrx.RECEIVED_EVT_SUBSCRIBERS:
rfxtrx.RECEIVED_EVT_SUBSCRIBERS.append(cover_update)
# pylint: disable=abstract-method
class RfxtrxCover(rfxtrx.RfxtrxDevice, CoverDevice):
"""Representation of an rfxtrx cover."""
@property
def should_poll(self):
"""No polling available in rfxtrx cover."""
return False
@property
def is_closed(self):
"""Return if the cover is closed."""
return None
def open_cover(self, **kwargs):
"""Move the cover up."""
self._send_command("roll_up")
def close_cover(self, **kwargs):
"""Move the cover down."""
self._send_command("roll_down")
def stop_cover(self, **kwargs):
"""Stop the cover."""
self._send_command("stop_roll")

View File

@ -0,0 +1,98 @@
"""
Support for building a Raspberry Pi cover in HA.
Instructions for building the controller can be found here
https://github.com/andrewshilliday/garage-door-controller
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.rpi_gpio/
"""
import logging
from time import sleep
import voluptuous as vol
from homeassistant.components.cover import CoverDevice
import homeassistant.components.rpi_gpio as rpi_gpio
import homeassistant.helpers.config_validation as cv
DEPENDENCIES = ['rpi_gpio']
_LOGGER = logging.getLogger(__name__)
_COVERS_SCHEMA = vol.All(
cv.ensure_list,
[
vol.Schema({
'name': str,
'relay_pin': int,
'state_pin': int,
})
]
)
PLATFORM_SCHEMA = vol.Schema({
'platform': str,
vol.Required('covers'): _COVERS_SCHEMA,
})
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup the cover platform."""
covers = []
covers_conf = config.get('covers')
for cover in covers_conf:
covers.append(RPiGPIOCover(cover['name'], cover['relay_pin'],
cover['state_pin']))
add_devices(covers)
# pylint: disable=abstract-method
class RPiGPIOCover(CoverDevice):
"""Representation of a Raspberry cover."""
def __init__(self, name, relay_pin, state_pin):
"""Initialize the cover."""
self._name = name
self._state = False
self._relay_pin = relay_pin
self._state_pin = state_pin
rpi_gpio.setup_output(self._relay_pin)
rpi_gpio.setup_input(self._state_pin, 'UP')
rpi_gpio.write_output(self._relay_pin, True)
@property
def unique_id(self):
"""Return the ID of this cover."""
return "{}.{}".format(self.__class__, self._name)
@property
def name(self):
"""Return the name of the cover if any."""
return self._name
def update(self):
"""Update the state of the cover."""
self._state = rpi_gpio.read_input(self._state_pin)
@property
def is_closed(self):
"""Return true if cover is closed."""
return self._state
def _trigger(self):
"""Trigger the cover."""
rpi_gpio.write_output(self._relay_pin, False)
sleep(0.2)
rpi_gpio.write_output(self._relay_pin, True)
def close_cover(self):
"""Close the cover."""
if not self.is_closed:
self._trigger()
def open_cover(self):
"""Open the cover."""
if self.is_closed:
self._trigger()

View File

@ -0,0 +1,96 @@
"""
Allow to configure a SCSGate cover.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.scsgate/
"""
import logging
import homeassistant.components.scsgate as scsgate
from homeassistant.components.cover import CoverDevice
from homeassistant.const import CONF_NAME
DEPENDENCIES = ['scsgate']
SCS_ID = 'scs_id'
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
"""Setup the SCSGate cover."""
devices = config.get('devices')
covers = []
logger = logging.getLogger(__name__)
if devices:
for _, entity_info in devices.items():
if entity_info[SCS_ID] in scsgate.SCSGATE.devices:
continue
logger.info("Adding %s scsgate.cover", entity_info[CONF_NAME])
name = entity_info[CONF_NAME]
scs_id = entity_info[SCS_ID]
cover = SCSGateCover(
name=name,
scs_id=scs_id,
logger=logger)
scsgate.SCSGATE.add_device(cover)
covers.append(cover)
add_devices_callback(covers)
# pylint: disable=too-many-arguments, too-many-instance-attributes
class SCSGateCover(CoverDevice):
"""Representation of SCSGate cover."""
def __init__(self, scs_id, name, logger):
"""Initialize the cover."""
self._scs_id = scs_id
self._name = name
self._logger = logger
@property
def scs_id(self):
"""Return the SCSGate ID."""
return self._scs_id
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def name(self):
"""Return the name of the cover."""
return self._name
@property
def is_closed(self):
"""Return if the cover is closed."""
return None
def open_cover(self, **kwargs):
"""Move the cover."""
from scsgate.tasks import RaiseRollerShutterTask
scsgate.SCSGATE.append_task(
RaiseRollerShutterTask(target=self._scs_id))
def close_cover(self, **kwargs):
"""Move the cover down."""
from scsgate.tasks import LowerRollerShutterTask
scsgate.SCSGATE.append_task(
LowerRollerShutterTask(target=self._scs_id))
def stop_cover(self, **kwargs):
"""Stop the cover."""
from scsgate.tasks import HaltRollerShutterTask
scsgate.SCSGATE.append_task(HaltRollerShutterTask(target=self._scs_id))
def process_event(self, message):
"""Handle a SCSGate message related with this cover."""
self._logger.debug(
"Rollershutter %s, got message %s",
self._scs_id, message.toggled)

View File

@ -0,0 +1,71 @@
open_cover:
description: Open all or specified cover
fields:
entity_id:
description: Name(s) of cover(s) to open
example: 'cover.living_room'
close_cover:
description: Close all or specified cover
fields:
entity_id:
description: Name(s) of cover(s) to close
example: 'cover.living_room'
set_cover_position:
description: Move to specific position all or specified cover
fields:
entity_id:
description: Name(s) of cover(s) to set cover position
example: 'cover.living_room'
position:
description: Position of the cover (0 to 100)
example: 30
stop_cover:
description: Stop all or specified cover
fields:
entity_id:
description: Name(s) of cover(s) to stop
example: 'cover.living_room'
open_cover_tilt:
description: Open all or specified cover tilt
fields:
entity_id:
description: Name(s) of cover(s) tilt to open
example: 'cover.living_room'
close_cover_tilt:
description: Close all or specified cover tilt
fields:
entity_id:
description: Name(s) of cover(s) to close tilt
example: 'cover.living_room'
set_cover_tilt_position:
description: Move to specific position all or specified cover tilt
fields:
entity_id:
description: Name(s) of cover(s) to set cover tilt position
example: 'cover.living_room'
position:
description: Position of the cover (0 to 100)
example: 30
stop_cover_tilt:
description: Stop all or specified cover
fields:
entity_id:
description: Name(s) of cover(s) to stop
example: 'cover.living_room'

View File

@ -0,0 +1,64 @@
"""
Support for Wink Covers.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/cover.wink/
"""
import logging
from homeassistant.components.cover import CoverDevice
from homeassistant.components.wink import WinkDevice
from homeassistant.const import CONF_ACCESS_TOKEN
REQUIREMENTS = ['python-wink==0.7.13', 'pubnub==3.8.2']
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup the Wink cover platform."""
import pywink
if discovery_info is None:
token = config.get(CONF_ACCESS_TOKEN)
if token is None:
logging.getLogger(__name__).error(
"Missing wink access_token. "
"Get one at https://winkbearertoken.appspot.com/")
return
pywink.set_bearer_token(token)
add_devices(WinkCoverDevice(shade) for shade, door in
pywink.get_shades())
class WinkCoverDevice(WinkDevice, CoverDevice):
"""Representation of a Wink covers."""
def __init__(self, wink):
"""Initialize the cover."""
WinkDevice.__init__(self, wink)
@property
def should_poll(self):
"""Wink Shades don't track their position."""
return False
def close_cover(self):
"""Close the shade."""
self.wink.set_state(0)
def open_cover(self):
"""Open the shade."""
self.wink.set_state(1)
@property
def is_closed(self):
"""Return if the cover is closed."""
state = self.wink.state()
if state == 0:
return True
elif state == 1:
return False
else:
return None

View File

@ -0,0 +1,184 @@
"""
Support for Zwave cover components.
For more details about this platform, please refer to the documentation
https://home-assistant.io/components/cover.zwave/
"""
# Because we do not compile openzwave on CI
# pylint: disable=import-error
import logging
from homeassistant.components.cover import DOMAIN
from homeassistant.components.zwave import ZWaveDeviceEntity
from homeassistant.components import zwave
from homeassistant.components.cover import CoverDevice
COMMAND_CLASS_SWITCH_MULTILEVEL = 0x26 # 38
COMMAND_CLASS_SWITCH_BINARY = 0x25 # 37
SOMFY = 0x47
SOMFY_ZRTSI = 0x5a52
SOMFY_ZRTSI_CONTROLLER = (SOMFY, SOMFY_ZRTSI)
WORKAROUND = 'workaround'
DEVICE_MAPPINGS = {
SOMFY_ZRTSI_CONTROLLER: WORKAROUND
}
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Find and return Z-Wave covers."""
if discovery_info is None or zwave.NETWORK is None:
return
node = zwave.NETWORK.nodes[discovery_info[zwave.ATTR_NODE_ID]]
value = node.values[discovery_info[zwave.ATTR_VALUE_ID]]
if (value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL and
value.index == 0):
value.set_change_verified(False)
add_devices([ZwaveRollershutter(value)])
elif (value.command_class == zwave.COMMAND_CLASS_SWITCH_BINARY or
value.command_class == zwave.COMMAND_CLASS_BARRIER_OPERATOR):
if value.type != zwave.TYPE_BOOL and \
value.genre != zwave.GENRE_USER:
return
value.set_change_verified(False)
add_devices([ZwaveGarageDoor(value)])
else:
return
class ZwaveRollershutter(zwave.ZWaveDeviceEntity, CoverDevice):
"""Representation of an Zwave roller shutter."""
def __init__(self, value):
"""Initialize the zwave rollershutter."""
import libopenzwave
from openzwave.network import ZWaveNetwork
from pydispatch import dispatcher
ZWaveDeviceEntity.__init__(self, value, DOMAIN)
self._lozwmgr = libopenzwave.PyManager()
self._lozwmgr.create()
self._node = value.node
self._current_position = None
self._workaround = None
dispatcher.connect(
self.value_changed, ZWaveNetwork.SIGNAL_VALUE_CHANGED)
if (value.node.manufacturer_id.strip() and
value.node.product_id.strip()):
specific_sensor_key = (int(value.node.manufacturer_id, 16),
int(value.node.product_type, 16))
if specific_sensor_key in DEVICE_MAPPINGS:
if DEVICE_MAPPINGS[specific_sensor_key] == WORKAROUND:
_LOGGER.debug("Controller without positioning feedback")
self._workaround = 1
def value_changed(self, value):
"""Called when a value has changed on the network."""
if self._value.value_id == value.value_id or \
self._value.node == value.node:
self.update_properties()
self.update_ha_state()
_LOGGER.debug("Value changed on network %s", value)
def update_properties(self):
"""Callback on data change for the registered node/value pair."""
# Position value
for value in self._node.get_values(
class_id=COMMAND_CLASS_SWITCH_MULTILEVEL).values():
if value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Level':
self._current_position = value.data
@property
def is_closed(self):
"""Return if the cover is closed."""
if self.current_cover_position > 0:
return False
else:
return True
@property
def current_cover_position(self):
"""Return the current position of Zwave roller shutter."""
if not self._workaround:
if self._current_position is not None:
if self._current_position <= 5:
return 0
elif self._current_position >= 95:
return 100
else:
return self._current_position
def open_cover(self, **kwargs):
"""Move the roller shutter up."""
for value in self._node.get_values(
class_id=COMMAND_CLASS_SWITCH_MULTILEVEL).values():
if value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Open' or \
value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Down':
self._lozwmgr.pressButton(value.value_id)
break
def close_cover(self, **kwargs):
"""Move the roller shutter down."""
for value in self._node.get_values(
class_id=COMMAND_CLASS_SWITCH_MULTILEVEL).values():
if value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Up' or \
value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Close':
self._lozwmgr.pressButton(value.value_id)
break
def set_cover_position(self, position, **kwargs):
"""Move the roller shutter to a specific position."""
self._node.set_dimmer(self._value.value_id, 100 - position)
def stop_cover(self, **kwargs):
"""Stop the roller shutter."""
for value in self._node.get_values(
class_id=COMMAND_CLASS_SWITCH_MULTILEVEL).values():
if value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Open' or \
value.command_class == zwave.COMMAND_CLASS_SWITCH_MULTILEVEL \
and value.label == 'Down':
self._lozwmgr.releaseButton(value.value_id)
break
class ZwaveGarageDoor(zwave.ZWaveDeviceEntity, CoverDevice):
"""Representation of an Zwave garage door device."""
def __init__(self, value):
"""Initialize the zwave garage door."""
from openzwave.network import ZWaveNetwork
from pydispatch import dispatcher
ZWaveDeviceEntity.__init__(self, value, DOMAIN)
self._state = value.data
dispatcher.connect(
self.value_changed, ZWaveNetwork.SIGNAL_VALUE_CHANGED)
def value_changed(self, value):
"""Called when a value has changed on the network."""
if self._value.value_id == value.value_id:
self._state = value.data
self.update_ha_state()
_LOGGER.debug("Value changed on network %s", value)
@property
def is_closed(self):
"""Return the current position of Zwave garage door."""
return not self._state
def close_cover(self):
"""Close the garage door."""
self._value.data = False
def open_cover(self):
"""Open the garage door."""
self._value.data = True

View File

@ -54,6 +54,9 @@ def open_door(hass, entity_id=None):
def setup(hass, config):
"""Track states and offer events for garage door."""
_LOGGER.warning('This component has been deprecated in favour of the '
'"cover" component and will be removed in the future.'
' Please upgrade.')
component = EntityComponent(
_LOGGER, DOMAIN, hass, SCAN_INTERVAL, GROUP_NAME_ALL_GARAGE_DOORS)
component.setup(config)

View File

@ -77,6 +77,9 @@ def stop(hass, entity_id=None):
def setup(hass, config):
"""Track states and offer events for roller shutters."""
_LOGGER.warning('This component has been deprecated in favour of the '
'"cover" component and will be removed in the future.'
' Please upgrade.')
component = EntityComponent(
_LOGGER, DOMAIN, hass, SCAN_INTERVAL, GROUP_NAME_ALL_ROLLERSHUTTERS)
component.setup(config)

View File

@ -162,22 +162,19 @@ DISCOVERY_COMPONENTS = [
[COMMAND_CLASS_DOOR_LOCK],
TYPE_BOOL,
GENRE_USER),
('rollershutter',
[GENERIC_COMMAND_CLASS_MULTILEVEL_SWITCH],
('cover',
[GENERIC_COMMAND_CLASS_MULTILEVEL_SWITCH,
GENERIC_COMMAND_CLASS_ENTRY_CONTROL],
[SPECIFIC_DEVICE_CLASS_MOTOR_CONTROL_CLASS_A,
SPECIFIC_DEVICE_CLASS_MOTOR_CONTROL_CLASS_B,
SPECIFIC_DEVICE_CLASS_MOTOR_CONTROL_CLASS_C,
SPECIFIC_DEVICE_CLASS_MULTIPOSITION_MOTOR],
[COMMAND_CLASS_WHATEVER],
TYPE_WHATEVER,
GENRE_USER),
('garage_door',
[GENERIC_COMMAND_CLASS_ENTRY_CONTROL],
[SPECIFIC_DEVICE_CLASS_SECURE_BARRIER_ADD_ON,
SPECIFIC_DEVICE_CLASS_MULTIPOSITION_MOTOR,
SPECIFIC_DEVICE_CLASS_SECURE_BARRIER_ADD_ON,
SPECIFIC_DEVICE_CLASS_SECURE_DOOR],
[COMMAND_CLASS_SWITCH_BINARY,
COMMAND_CLASS_BARRIER_OPERATOR],
TYPE_BOOL,
COMMAND_CLASS_BARRIER_OPERATOR,
COMMAND_CLASS_SWITCH_MULTILEVEL],
TYPE_WHATEVER,
GENRE_USER),
('climate',
[GENERIC_COMMAND_CLASS_THERMOSTAT],

View File

@ -233,6 +233,15 @@ SERVICE_UNLOCK = 'unlock'
SERVICE_OPEN = 'open'
SERVICE_CLOSE = 'close'
SERVICE_CLOSE_COVER = 'close_cover'
SERVICE_CLOSE_COVER_TILT = 'close_cover_tilt'
SERVICE_OPEN_COVER = 'open_cover'
SERVICE_OPEN_COVER_TILT = 'open_cover_tilt'
SERVICE_SET_COVER_POSITION = 'set_cover_position'
SERVICE_SET_COVER_TILT_POSITION = 'set_cover_tilt_position'
SERVICE_STOP_COVER = 'stop'
SERVICE_STOP_COVER_TILT = 'stop_cover_tilt'
SERVICE_MOVE_UP = 'move_up'
SERVICE_MOVE_DOWN = 'move_down'
SERVICE_MOVE_POSITION = 'move_position'

View File

@ -273,6 +273,7 @@ psutil==4.3.0
# homeassistant.components.wink
# homeassistant.components.binary_sensor.wink
# homeassistant.components.cover.wink
# homeassistant.components.garage_door.wink
# homeassistant.components.light.wink
# homeassistant.components.lock.wink
@ -382,6 +383,7 @@ python-twitch==1.3.0
# homeassistant.components.wink
# homeassistant.components.binary_sensor.wink
# homeassistant.components.cover.wink
# homeassistant.components.garage_door.wink
# homeassistant.components.light.wink
# homeassistant.components.lock.wink

View File

@ -0,0 +1,84 @@
"""The tests the cover command line platform."""
import os
import tempfile
import unittest
from unittest import mock
import homeassistant.core as ha
import homeassistant.components.cover as cover
from homeassistant.components.cover import (
command_line as cmd_rs)
class TestCommandCover(unittest.TestCase):
"""Test the cover command line platform."""
def setup_method(self, method):
"""Setup things to be run when tests are started."""
self.hass = ha.HomeAssistant()
self.hass.config.latitude = 32.87336
self.hass.config.longitude = 117.22743
self.rs = cmd_rs.CommandCover(self.hass, 'foo',
'cmd_open', 'cmd_close',
'cmd_stop', 'cmd_state',
None) # FIXME
def teardown_method(self, method):
"""Stop down everything that was started."""
self.hass.stop()
def test_should_poll(self):
"""Test the setting of polling."""
self.assertTrue(self.rs.should_poll)
self.rs._command_state = None
self.assertFalse(self.rs.should_poll)
def test_query_state_value(self):
"""Test with state value."""
with mock.patch('subprocess.check_output') as mock_run:
mock_run.return_value = b' foo bar '
result = self.rs._query_state_value('runme')
self.assertEqual('foo bar', result)
mock_run.assert_called_once_with('runme', shell=True)
def test_state_value(self):
"""Test with state value."""
with tempfile.TemporaryDirectory() as tempdirname:
path = os.path.join(tempdirname, 'cover_status')
test_cover = {
'statecmd': 'cat {}'.format(path),
'opencmd': 'echo 1 > {}'.format(path),
'closecmd': 'echo 1 > {}'.format(path),
'stopcmd': 'echo 0 > {}'.format(path),
'value_template': '{{ value }}'
}
self.assertTrue(cover.setup(self.hass, {
'cover': {
'platform': 'command_line',
'covers': {
'test': test_cover
}
}
}))
state = self.hass.states.get('cover.test')
self.assertEqual('unknown', state.state)
cover.open_cover(self.hass, 'cover.test')
self.hass.pool.block_till_done()
state = self.hass.states.get('cover.test')
self.assertEqual('open', state.state)
cover.close_cover(self.hass, 'cover.test')
self.hass.pool.block_till_done()
state = self.hass.states.get('cover.test')
self.assertEqual('open', state.state)
cover.stop_cover(self.hass, 'cover.test')
self.hass.pool.block_till_done()
state = self.hass.states.get('cover.test')
self.assertEqual('closed', state.state)

View File

@ -0,0 +1,138 @@
"""The tests for the Demo cover platform."""
import unittest
from datetime import timedelta
import homeassistant.util.dt as dt_util
from homeassistant.components import cover
from tests.common import get_test_home_assistant, fire_time_changed
ENTITY_COVER = 'cover.living_room_window'
class TestCoverDemo(unittest.TestCase):
"""Test the Demo cover."""
def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.assertTrue(cover.setup(self.hass, {'cover': {
'platform': 'demo',
}}))
def tearDown(self): # pylint: disable=invalid-name
"""Stop down everything that was started."""
self.hass.stop()
def test_close_cover(self):
"""Test closing the cover."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(70, state.attributes.get('current_position'))
cover.close_cover(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
for _ in range(7):
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(0, state.attributes.get('current_position'))
def test_open_cover(self):
"""Test opening the cover."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(70, state.attributes.get('current_position'))
cover.open_cover(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
for _ in range(7):
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(100, state.attributes.get('current_position'))
def test_set_cover_position(self):
"""Test moving the cover to a specific position."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(70, state.attributes.get('current_position'))
cover.set_cover_position(self.hass, 10, ENTITY_COVER)
self.hass.pool.block_till_done()
for _ in range(6):
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(10, state.attributes.get('current_position'))
def test_stop_cover(self):
"""Test stopping the cover."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(70, state.attributes.get('current_position'))
cover.open_cover(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
cover.stop_cover(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
fire_time_changed(self.hass, future)
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(80, state.attributes.get('current_position'))
def test_close_cover_tilt(self):
"""Test closing the cover tilt."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(50, state.attributes.get('current_tilt_position'))
cover.close_cover_tilt(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
for _ in range(7):
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(0, state.attributes.get('current_tilt_position'))
def test_open_cover_tilt(self):
"""Test opening the cover tilt."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(50, state.attributes.get('current_tilt_position'))
cover.open_cover_tilt(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
for _ in range(7):
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(100, state.attributes.get('current_tilt_position'))
def test_set_cover_tilt_position(self):
"""Test moving the cover til to a specific position."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(50, state.attributes.get('current_tilt_position'))
cover.set_cover_tilt_position(self.hass, 90, ENTITY_COVER)
self.hass.pool.block_till_done()
for _ in range(7):
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(90, state.attributes.get('current_tilt_position'))
def test_stop_cover_tilt(self):
"""Test stopping the cover tilt."""
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(50, state.attributes.get('current_tilt_position'))
cover.close_cover_tilt(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
future = dt_util.utcnow() + timedelta(seconds=1)
fire_time_changed(self.hass, future)
self.hass.pool.block_till_done()
cover.stop_cover_tilt(self.hass, ENTITY_COVER)
self.hass.pool.block_till_done()
fire_time_changed(self.hass, future)
state = self.hass.states.get(ENTITY_COVER)
self.assertEqual(40, state.attributes.get('current_tilt_position'))

View File

@ -0,0 +1,174 @@
"""The tests for the MQTT cover platform."""
import unittest
from homeassistant.bootstrap import _setup_component
from homeassistant.const import STATE_OPEN, STATE_CLOSED, STATE_UNKNOWN
import homeassistant.components.cover as cover
from tests.common import mock_mqtt_component, fire_mqtt_message
from tests.common import get_test_home_assistant
class TestCoverMQTT(unittest.TestCase):
"""Test the MQTT cover."""
def setUp(self): # pylint: disable=invalid-name
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.mock_publish = mock_mqtt_component(self.hass)
def tearDown(self): # pylint: disable=invalid-name
"""Stop down everything that was started."""
self.hass.stop()
def test_controlling_state_via_topic(self):
"""Test the controlling state via topic."""
self.hass.config.components = ['mqtt']
assert _setup_component(self.hass, cover.DOMAIN, {
cover.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'qos': 0,
'payload_open': 'OPEN',
'payload_close': 'CLOSE',
'payload_stop': 'STOP'
}
})
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
fire_mqtt_message(self.hass, 'state-topic', '0')
self.hass.pool.block_till_done()
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_CLOSED, state.state)
fire_mqtt_message(self.hass, 'state-topic', '50')
self.hass.pool.block_till_done()
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_OPEN, state.state)
fire_mqtt_message(self.hass, 'state-topic', '100')
self.hass.pool.block_till_done()
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_OPEN, state.state)
def test_send_open_cover_command(self):
"""Test the sending of open_cover."""
self.hass.config.components = ['mqtt']
assert _setup_component(self.hass, cover.DOMAIN, {
cover.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'qos': 2
}
})
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
cover.open_cover(self.hass, 'cover.test')
self.hass.pool.block_till_done()
self.assertEqual(('command-topic', 'OPEN', 2, False),
self.mock_publish.mock_calls[-1][1])
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
def test_send_close_cover_command(self):
"""Test the sending of close_cover."""
self.hass.config.components = ['mqtt']
assert _setup_component(self.hass, cover.DOMAIN, {
cover.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'qos': 2
}
})
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
cover.close_cover(self.hass, 'cover.test')
self.hass.pool.block_till_done()
self.assertEqual(('command-topic', 'CLOSE', 2, False),
self.mock_publish.mock_calls[-1][1])
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
def test_send_stop__cover_command(self):
"""Test the sending of stop_cover."""
self.hass.config.components = ['mqtt']
assert _setup_component(self.hass, cover.DOMAIN, {
cover.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'qos': 2
}
})
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
cover.stop_cover(self.hass, 'cover.test')
self.hass.pool.block_till_done()
self.assertEqual(('command-topic', 'STOP', 2, False),
self.mock_publish.mock_calls[-1][1])
state = self.hass.states.get('cover.test')
self.assertEqual(STATE_UNKNOWN, state.state)
def test_state_attributes_current_cover_position(self):
"""Test the current cover position."""
self.hass.config.components = ['mqtt']
assert _setup_component(self.hass, cover.DOMAIN, {
cover.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'state-topic',
'command_topic': 'command-topic',
'payload_open': 'OPEN',
'payload_close': 'CLOSE',
'payload_stop': 'STOP'
}
})
state_attributes_dict = self.hass.states.get(
'cover.test').attributes
self.assertTrue('current_position' in state_attributes_dict)
fire_mqtt_message(self.hass, 'state-topic', '0')
self.hass.pool.block_till_done()
current_cover_position = self.hass.states.get(
'cover.test').attributes['current_position']
self.assertEqual(0, current_cover_position)
fire_mqtt_message(self.hass, 'state-topic', '50')
self.hass.pool.block_till_done()
current_cover_position = self.hass.states.get(
'cover.test').attributes['current_position']
self.assertEqual(50, current_cover_position)
fire_mqtt_message(self.hass, 'state-topic', '101')
self.hass.pool.block_till_done()
current_cover_position = self.hass.states.get(
'cover.test').attributes['current_position']
self.assertEqual(50, current_cover_position)
fire_mqtt_message(self.hass, 'state-topic', 'non-numeric')
self.hass.pool.block_till_done()
current_cover_position = self.hass.states.get(
'cover.test').attributes['current_position']
self.assertEqual(50, current_cover_position)

View File

@ -0,0 +1,216 @@
"""The tests for the Rfxtrx cover platform."""
import unittest
from homeassistant.bootstrap import _setup_component
from homeassistant.components import rfxtrx as rfxtrx_core
from tests.common import get_test_home_assistant
class TestCoverRfxtrx(unittest.TestCase):
"""Test the Rfxtrx cover platform."""
def setUp(self):
"""Setup things to be run when tests are started."""
self.hass = get_test_home_assistant(0)
self.hass.config.components = ['rfxtrx']
def tearDown(self):
"""Stop everything that was started."""
rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS = []
rfxtrx_core.RFX_DEVICES = {}
if rfxtrx_core.RFXOBJECT:
rfxtrx_core.RFXOBJECT.close_connection()
self.hass.stop()
def test_valid_config(self):
"""Test configuration."""
self.assertTrue(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': True,
'devices':
{'0b1100cd0213c7f210010f51': {
'name': 'Test',
rfxtrx_core.ATTR_FIREEVENT: True}
}}}))
def test_invalid_config_capital_letters(self):
"""Test configuration."""
self.assertFalse(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': True,
'devices':
{'2FF7f216': {
'name': 'Test',
'packetid': '0b1100cd0213c7f210010f51',
'signal_repetitions': 3}
}}}))
def test_invalid_config_extra_key(self):
"""Test configuration."""
self.assertFalse(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': True,
'invalid_key': 'afda',
'devices':
{'213c7f216': {
'name': 'Test',
'packetid': '0b1100cd0213c7f210010f51',
rfxtrx_core.ATTR_FIREEVENT: True}
}}}))
def test_invalid_config_capital_packetid(self):
"""Test configuration."""
self.assertFalse(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': True,
'devices':
{'213c7f216': {
'name': 'Test',
'packetid': 'AA1100cd0213c7f210010f51',
rfxtrx_core.ATTR_FIREEVENT: True}
}}}))
def test_invalid_config_missing_packetid(self):
"""Test configuration."""
self.assertFalse(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': True,
'devices':
{'213c7f216': {
'name': 'Test',
rfxtrx_core.ATTR_FIREEVENT: True}
}}}))
def test_default_config(self):
"""Test with 0 cover."""
self.assertTrue(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'devices': {}}}))
self.assertEqual(0, len(rfxtrx_core.RFX_DEVICES))
def test_one_cover(self):
"""Test with 1 cover."""
self.assertTrue(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'devices':
{'0b1400cd0213c7f210010f51': {
'name': 'Test'
}}}}))
import RFXtrx as rfxtrxmod
rfxtrx_core.RFXOBJECT =\
rfxtrxmod.Core("", transport_protocol=rfxtrxmod.DummyTransport)
self.assertEqual(1, len(rfxtrx_core.RFX_DEVICES))
for id in rfxtrx_core.RFX_DEVICES:
entity = rfxtrx_core.RFX_DEVICES[id]
self.assertEqual(entity.signal_repetitions, 1)
self.assertFalse(entity.should_fire_event)
self.assertFalse(entity.should_poll)
entity.open_cover()
entity.close_cover()
entity.stop_cover()
def test_several_covers(self):
"""Test with 3 covers."""
self.assertTrue(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'signal_repetitions': 3,
'devices':
{'0b1100cd0213c7f230010f71': {
'name': 'Test'},
'0b1100100118cdea02010f70': {
'name': 'Bath'},
'0b1100101118cdea02010f70': {
'name': 'Living'}
}}}))
self.assertEqual(3, len(rfxtrx_core.RFX_DEVICES))
device_num = 0
for id in rfxtrx_core.RFX_DEVICES:
entity = rfxtrx_core.RFX_DEVICES[id]
self.assertEqual(entity.signal_repetitions, 3)
if entity.name == 'Living':
device_num = device_num + 1
elif entity.name == 'Bath':
device_num = device_num + 1
elif entity.name == 'Test':
device_num = device_num + 1
self.assertEqual(3, device_num)
def test_discover_covers(self):
"""Test with discovery of covers."""
self.assertTrue(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': True,
'devices': {}}}))
event = rfxtrx_core.get_rfx_object('0a140002f38cae010f0070')
event.data = bytearray([0x0A, 0x14, 0x00, 0x02, 0xF3, 0x8C,
0xAE, 0x01, 0x0F, 0x00, 0x70])
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(1, len(rfxtrx_core.RFX_DEVICES))
event = rfxtrx_core.get_rfx_object('0a1400adf394ab020e0060')
event.data = bytearray([0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94,
0xAB, 0x02, 0x0E, 0x00, 0x60])
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(2, len(rfxtrx_core.RFX_DEVICES))
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object('0a52085e070100b31b0279')
event.data = bytearray(b'\nR\x08^\x07\x01\x00\xb3\x1b\x02y')
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(2, len(rfxtrx_core.RFX_DEVICES))
# Trying to add a light
event = rfxtrx_core.get_rfx_object('0b1100100118cdea02010f70')
event.data = bytearray([0x0b, 0x11, 0x11, 0x10, 0x01, 0x18,
0xcd, 0xea, 0x01, 0x02, 0x0f, 0x70])
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(2, len(rfxtrx_core.RFX_DEVICES))
def test_discover_cover_noautoadd(self):
"""Test with discovery of cover when auto add is False."""
self.assertTrue(_setup_component(self.hass, 'cover', {
'cover': {'platform': 'rfxtrx',
'automatic_add': False,
'devices': {}}}))
event = rfxtrx_core.get_rfx_object('0a1400adf394ab010d0060')
event.data = bytearray([0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94,
0xAB, 0x01, 0x0D, 0x00, 0x60])
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(0, len(rfxtrx_core.RFX_DEVICES))
event = rfxtrx_core.get_rfx_object('0a1400adf394ab020e0060')
event.data = bytearray([0x0A, 0x14, 0x00, 0xAD, 0xF3, 0x94,
0xAB, 0x02, 0x0E, 0x00, 0x60])
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(0, len(rfxtrx_core.RFX_DEVICES))
# Trying to add a sensor
event = rfxtrx_core.get_rfx_object('0a52085e070100b31b0279')
event.data = bytearray(b'\nR\x08^\x07\x01\x00\xb3\x1b\x02y')
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(0, len(rfxtrx_core.RFX_DEVICES))
# Trying to add a light
event = rfxtrx_core.get_rfx_object('0b1100100118cdea02010f70')
event.data = bytearray([0x0b, 0x11, 0x11, 0x10, 0x01,
0x18, 0xcd, 0xea, 0x01, 0x02, 0x0f, 0x70])
for evt_sub in rfxtrx_core.RECEIVED_EVT_SUBSCRIBERS:
evt_sub(event)
self.assertEqual(0, len(rfxtrx_core.RFX_DEVICES))