Merge remote-tracking branch 'upstream/dev' into dev

This commit is contained in:
pavoni 2015-09-15 14:30:01 +01:00
commit 408f0cff78
67 changed files with 2907 additions and 758 deletions

View File

@ -46,9 +46,11 @@ omit =
homeassistant/components/light/limitlessled.py
homeassistant/components/media_player/cast.py
homeassistant/components/media_player/denon.py
homeassistant/components/media_player/itunes.py
homeassistant/components/media_player/kodi.py
homeassistant/components/media_player/mpd.py
homeassistant/components/media_player/squeezebox.py
homeassistant/components/media_player/sonos.py
homeassistant/components/notify/file.py
homeassistant/components/notify/instapush.py
homeassistant/components/notify/nma.py
@ -60,9 +62,11 @@ omit =
homeassistant/components/notify/xmpp.py
homeassistant/components/sensor/arest.py
homeassistant/components/sensor/bitcoin.py
homeassistant/components/sensor/command_sensor.py
homeassistant/components/sensor/dht.py
homeassistant/components/sensor/efergy.py
homeassistant/components/sensor/forecast.py
homeassistant/components/sensor/glances.py
homeassistant/components/sensor/mysensors.py
homeassistant/components/sensor/openweathermap.py
homeassistant/components/sensor/rfxtrx.py
@ -73,6 +77,7 @@ omit =
homeassistant/components/sensor/temper.py
homeassistant/components/sensor/time_date.py
homeassistant/components/sensor/transmission.py
homeassistant/components/switch/arest.py
homeassistant/components/switch/command_switch.py
homeassistant/components/switch/edimax.py
homeassistant/components/switch/hikvisioncam.py

View File

@ -18,7 +18,7 @@ Examples of devices it can interface it:
* Monitoring connected devices to a wireless router: [OpenWrt](https://openwrt.org/), [Tomato](http://www.polarcloud.com/tomato), [Netgear](http://netgear.com), [DD-WRT](http://www.dd-wrt.com/site/index), [TPLink](http://www.tp-link.us/), and [ASUSWRT](http://event.asus.com/2013/nw/ASUSWRT/)
* [Philips Hue](http://meethue.com) lights, [WeMo](http://www.belkin.com/us/Products/home-automation/c/wemo-home-automation/) switches, [Edimax](http://www.edimax.com/) switches, [Efergy](https://efergy.com) energy monitoring, RFXtrx sensors, and [Tellstick](http://www.telldus.se/products/tellstick) devices and sensors
* [Google Chromecasts](http://www.google.com/intl/en/chrome/devices/chromecast), [Music Player Daemon](http://www.musicpd.org/), [Logitech Squeezebox](https://en.wikipedia.org/wiki/Squeezebox_%28network_music_player%29), and [Kodi (XBMC)](http://kodi.tv/)
* [Google Chromecasts](http://www.google.com/intl/en/chrome/devices/chromecast), [Music Player Daemon](http://www.musicpd.org/), [Logitech Squeezebox](https://en.wikipedia.org/wiki/Squeezebox_%28network_music_player%29), [Kodi (XBMC)](http://kodi.tv/), and iTunes (by way of [itunes-api](https://github.com/maddox/itunes-api))
* Support for [ISY994](https://www.universal-devices.com/residential/isy994i-series/) (Insteon and X10 devices), [Z-Wave](http://www.z-wave.com/), [Nest Thermostats](https://nest.com/), [Arduino](https://www.arduino.cc/), [Raspberry Pi](https://www.raspberrypi.org/), and [Modbus](http://www.modbus.org/)
* Integrate data from the [Bitcoin](https://bitcoin.org) network, meteorological data from [OpenWeatherMap](http://openweathermap.org/) and [Forecast.io](https://forecast.io/), [Transmission](http://www.transmissionbt.com/), or [SABnzbd](http://sabnzbd.org).
* [See full list of supported devices](https://home-assistant.io/components/)

View File

@ -95,6 +95,14 @@ def get_arguments():
type=int,
default=None,
help='Enables daily log rotation and keeps up to the specified days')
parser.add_argument(
'--install-osx',
action='store_true',
help='Installs as a service on OS X and loads on boot.')
parser.add_argument(
'--uninstall-osx',
action='store_true',
help='Uninstalls from OS X.')
if os.name != "nt":
parser.add_argument(
'--daemon',
@ -152,6 +160,46 @@ def write_pid(pid_file):
sys.exit(1)
def install_osx():
""" Setup to run via launchd on OS X """
with os.popen('which hass') as inp:
hass_path = inp.read().strip()
with os.popen('whoami') as inp:
user = inp.read().strip()
cwd = os.path.dirname(__file__)
template_path = os.path.join(cwd, 'startup', 'launchd.plist')
with open(template_path, 'r', encoding='utf-8') as inp:
plist = inp.read()
plist = plist.replace("$HASS_PATH$", hass_path)
plist = plist.replace("$USER$", user)
path = os.path.expanduser("~/Library/LaunchAgents/org.homeassistant.plist")
try:
with open(path, 'w', encoding='utf-8') as outp:
outp.write(plist)
except IOError as err:
print('Unable to write to ' + path, err)
return
os.popen('launchctl load -w -F ' + path)
print("Home Assistant has been installed. \
Open it here: http://localhost:8123")
def uninstall_osx():
""" Unload from launchd on OS X """
path = os.path.expanduser("~/Library/LaunchAgents/org.homeassistant.plist")
os.popen('launchctl unload ' + path)
print("Home Assistant has been uninstalled.")
def main():
""" Starts Home Assistant. """
validate_python()
@ -161,6 +209,14 @@ def main():
config_dir = os.path.join(os.getcwd(), args.config)
ensure_config_path(config_dir)
# os x launchd functions
if args.install_osx:
install_osx()
return
if args.uninstall_osx:
uninstall_osx()
return
# daemon functions
if args.pid_file:
check_pid(args.pid_file)

View File

@ -123,6 +123,7 @@ def prepare_setup_platform(hass, config, domain, platform_name):
# Not found
if platform is None:
_LOGGER.error('Unable to find platform %s', platform_path)
return None
# Already loaded

View File

@ -9,7 +9,8 @@ import logging
from homeassistant.bootstrap import prepare_setup_platform
from homeassistant.helpers import config_per_platform
from homeassistant.util import split_entity_id
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.const import ATTR_ENTITY_ID, CONF_PLATFORM
from homeassistant.components import logbook
DOMAIN = "automation"
@ -19,6 +20,7 @@ CONF_ALIAS = "alias"
CONF_SERVICE = "execute_service"
CONF_SERVICE_ENTITY_ID = "service_entity_id"
CONF_SERVICE_DATA = "service_data"
CONF_IF = "if"
_LOGGER = logging.getLogger(__name__)
@ -34,7 +36,15 @@ def setup(hass, config):
_LOGGER.error("Unknown automation platform specified: %s", p_type)
continue
if platform.register(hass, p_config, _get_action(hass, p_config)):
action = _get_action(hass, p_config)
if action is None:
return
if CONF_IF in p_config:
action = _process_if(hass, config, p_config[CONF_IF], action)
if platform.trigger(hass, p_config, action):
_LOGGER.info(
"Initialized %s rule %s", p_type, p_config.get(CONF_ALIAS, ""))
success = True
@ -48,27 +58,59 @@ def setup(hass, config):
def _get_action(hass, config):
""" Return an action based on a config. """
name = config.get(CONF_ALIAS, 'Unnamed automation')
if CONF_SERVICE not in config:
_LOGGER.error('Error setting up %s, no action specified.',
name)
return
def action():
""" Action to be executed. """
_LOGGER.info("Executing rule %s", config.get(CONF_ALIAS, ""))
_LOGGER.info('Executing %s', name)
logbook.log_entry(hass, name, 'has been triggered', DOMAIN)
if CONF_SERVICE in config:
domain, service = split_entity_id(config[CONF_SERVICE])
domain, service = split_entity_id(config[CONF_SERVICE])
service_data = config.get(CONF_SERVICE_DATA, {})
service_data = config.get(CONF_SERVICE_DATA, {})
if not isinstance(service_data, dict):
_LOGGER.error("%s should be a dictionary", CONF_SERVICE_DATA)
service_data = {}
if not isinstance(service_data, dict):
_LOGGER.error("%s should be a dictionary", CONF_SERVICE_DATA)
service_data = {}
if CONF_SERVICE_ENTITY_ID in config:
try:
service_data[ATTR_ENTITY_ID] = \
config[CONF_SERVICE_ENTITY_ID].split(",")
except AttributeError:
service_data[ATTR_ENTITY_ID] = \
config[CONF_SERVICE_ENTITY_ID]
if CONF_SERVICE_ENTITY_ID in config:
try:
service_data[ATTR_ENTITY_ID] = \
config[CONF_SERVICE_ENTITY_ID].split(",")
except AttributeError:
service_data[ATTR_ENTITY_ID] = \
config[CONF_SERVICE_ENTITY_ID]
hass.services.call(domain, service, service_data)
hass.services.call(domain, service, service_data)
return action
def _process_if(hass, config, if_configs, action):
""" Processes if checks. """
if isinstance(if_configs, dict):
if_configs = [if_configs]
for if_config in if_configs:
p_type = if_config.get(CONF_PLATFORM)
if p_type is None:
_LOGGER.error("No platform defined found for if-statement %s",
if_config)
continue
platform = prepare_setup_platform(hass, config, DOMAIN, p_type)
if platform is None or not hasattr(platform, 'if_action'):
_LOGGER.error("Unsupported if-statement platform specified: %s",
p_type)
continue
action = platform.if_action(hass, if_config, action)
return action

View File

@ -12,7 +12,7 @@ CONF_EVENT_DATA = "event_data"
_LOGGER = logging.getLogger(__name__)
def register(hass, config, action):
def trigger(hass, config, action):
""" Listen for events based on config. """
event_type = config.get(CONF_EVENT_TYPE)

View File

@ -14,7 +14,7 @@ CONF_TOPIC = 'mqtt_topic'
CONF_PAYLOAD = 'mqtt_payload'
def register(hass, config, action):
def trigger(hass, config, action):
""" Listen for state changes based on `config`. """
topic = config.get(CONF_TOPIC)
payload = config.get(CONF_PAYLOAD)

View File

@ -0,0 +1,93 @@
"""
homeassistant.components.automation.numeric_state
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Offers numeric state listening automation rules.
"""
import logging
from homeassistant.helpers.event import track_state_change
CONF_ENTITY_ID = "state_entity_id"
CONF_BELOW = "state_below"
CONF_ABOVE = "state_above"
_LOGGER = logging.getLogger(__name__)
def trigger(hass, config, action):
""" Listen for state changes based on `config`. """
entity_id = config.get(CONF_ENTITY_ID)
if entity_id is None:
_LOGGER.error("Missing configuration key %s", CONF_ENTITY_ID)
return False
below = config.get(CONF_BELOW)
above = config.get(CONF_ABOVE)
if below is None and above is None:
_LOGGER.error("Missing configuration key."
" One of %s or %s is required",
CONF_BELOW, CONF_ABOVE)
return False
# pylint: disable=unused-argument
def state_automation_listener(entity, from_s, to_s):
""" Listens for state changes and calls action. """
# Fire action if we go from outside range into range
if _in_range(to_s.state, above, below) and \
(from_s is None or not _in_range(from_s.state, above, below)):
action()
track_state_change(
hass, entity_id, state_automation_listener)
return True
def if_action(hass, config, action):
""" Wraps action method with state based condition. """
entity_id = config.get(CONF_ENTITY_ID)
if entity_id is None:
_LOGGER.error("Missing configuration key %s", CONF_ENTITY_ID)
return action
below = config.get(CONF_BELOW)
above = config.get(CONF_ABOVE)
if below is None and above is None:
_LOGGER.error("Missing configuration key."
" One of %s or %s is required",
CONF_BELOW, CONF_ABOVE)
return action
def state_if():
""" Execute action if state matches. """
state = hass.states.get(entity_id)
if state is None or _in_range(state.state, above, below):
action()
return state_if
def _in_range(value, range_start, range_end):
""" Checks if value is inside the range """
try:
value = float(value)
except ValueError:
_LOGGER.warn("Missing value in numeric check")
return False
if range_start is not None and range_end is not None:
return float(range_start) <= value < float(range_end)
elif range_end is not None:
return value < float(range_end)
else:
return float(range_start) <= value

View File

@ -13,15 +13,16 @@ from homeassistant.const import MATCH_ALL
CONF_ENTITY_ID = "state_entity_id"
CONF_FROM = "state_from"
CONF_TO = "state_to"
CONF_STATE = "state"
def register(hass, config, action):
def trigger(hass, config, action):
""" Listen for state changes based on `config`. """
entity_id = config.get(CONF_ENTITY_ID)
if entity_id is None:
logging.getLogger(__name__).error(
"Missing configuration key %s", CONF_ENTITY_ID)
"Missing trigger configuration key %s", CONF_ENTITY_ID)
return False
from_state = config.get(CONF_FROM, MATCH_ALL)
@ -35,3 +36,22 @@ def register(hass, config, action):
hass, entity_id, state_automation_listener, from_state, to_state)
return True
def if_action(hass, config, action):
""" Wraps action method with state based condition. """
entity_id = config.get(CONF_ENTITY_ID)
state = config.get(CONF_STATE)
if entity_id is None or state is None:
logging.getLogger(__name__).error(
"Missing if-condition configuration key %s or %s", CONF_ENTITY_ID,
CONF_STATE)
return action
def state_if():
""" Execute action if state matches. """
if hass.states.is_state(entity_id, state):
action()
return state_if

View File

@ -4,15 +4,23 @@ homeassistant.components.automation.time
Offers time listening automation rules.
"""
import logging
from homeassistant.util import convert
import homeassistant.util.dt as dt_util
from homeassistant.helpers.event import track_time_change
CONF_HOURS = "time_hours"
CONF_MINUTES = "time_minutes"
CONF_SECONDS = "time_seconds"
CONF_BEFORE = "before"
CONF_AFTER = "after"
CONF_WEEKDAY = "weekday"
WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
def register(hass, config, action):
def trigger(hass, config, action):
""" Listen for state changes based on `config`. """
hours = convert(config.get(CONF_HOURS), int)
minutes = convert(config.get(CONF_MINUTES), int)
@ -26,3 +34,49 @@ def register(hass, config, action):
hour=hours, minute=minutes, second=seconds)
return True
def if_action(hass, config, action):
""" Wraps action method with time based condition. """
before = config.get(CONF_BEFORE)
after = config.get(CONF_AFTER)
weekday = config.get(CONF_WEEKDAY)
if before is None and after is None and weekday is None:
logging.getLogger(__name__).error(
"Missing if-condition configuration key %s, %s or %s",
CONF_BEFORE, CONF_AFTER, CONF_WEEKDAY)
def time_if():
""" Validate time based if-condition """
now = dt_util.now()
if before is not None:
# Strip seconds if given
before_h, before_m = before.split(':')[0:2]
before_point = now.replace(hour=int(before_h),
minute=int(before_m))
if now > before_point:
return
if after is not None:
# Strip seconds if given
after_h, after_m = after.split(':')[0:2]
after_point = now.replace(hour=int(after_h), minute=int(after_m))
if now < after_point:
return
if weekday is not None:
now_weekday = WEEKDAYS[now.weekday()]
if isinstance(weekday, str) and weekday != now_weekday or \
now_weekday not in weekday:
return
action()
return time_if

View File

@ -17,7 +17,7 @@ DOMAIN = "demo"
DEPENDENCIES = ['introduction', 'conversation']
COMPONENTS_WITH_DEMO_PLATFORM = [
'switch', 'light', 'thermostat', 'sensor', 'media_player', 'notify']
'switch', 'light', 'sensor', 'thermostat', 'media_player', 'notify']
def setup(hass, config):

View File

@ -1,52 +1,82 @@
"""
homeassistant.components.tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
homeassistant.components.device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides functionality to keep track of devices.
device_tracker:
platform: netgear
# Optional
# How many seconds to wait after not seeing device to consider it not home
consider_home: 180
# Seconds between each scan
interval_seconds: 12
# New found devices auto found
track_new_devices: yes
"""
import logging
import threading
import os
import csv
from datetime import timedelta
import logging
import os
import threading
from homeassistant.helpers import validate_config
from homeassistant.helpers.entity import _OVERWRITE
from homeassistant.bootstrap import prepare_setup_platform
from homeassistant.components import discovery, group
from homeassistant.config import load_yaml_config_file
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_per_platform
from homeassistant.helpers.entity import Entity
import homeassistant.util as util
import homeassistant.util.dt as dt_util
from homeassistant.bootstrap import prepare_setup_platform
from homeassistant.helpers.event import track_utc_time_change
from homeassistant.const import (
STATE_HOME, STATE_NOT_HOME, ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME,
CONF_PLATFORM, DEVICE_DEFAULT_NAME)
from homeassistant.components import group
ATTR_ENTITY_PICTURE, DEVICE_DEFAULT_NAME, STATE_HOME, STATE_NOT_HOME)
DOMAIN = "device_tracker"
DEPENDENCIES = []
SERVICE_DEVICE_TRACKER_RELOAD = "reload_devices_csv"
GROUP_NAME_ALL_DEVICES = 'all devices'
ENTITY_ID_ALL_DEVICES = group.ENTITY_ID_FORMAT.format('all_devices')
ENTITY_ID_FORMAT = DOMAIN + '.{}'
# After how much time do we consider a device not home if
# it does not show up on scans
TIME_DEVICE_NOT_FOUND = timedelta(minutes=3)
CSV_DEVICES = "known_devices.csv"
YAML_DEVICES = 'known_devices.yaml'
# Filename to save known devices to
KNOWN_DEVICES_FILE = "known_devices.csv"
CONF_TRACK_NEW = "track_new_devices"
DEFAULT_CONF_TRACK_NEW = True
CONF_SECONDS = "interval_seconds"
CONF_CONSIDER_HOME = 'consider_home'
DEFAULT_CONF_CONSIDER_HOME = 180 # seconds
DEFAULT_CONF_SECONDS = 12
CONF_SCAN_INTERVAL = "interval_seconds"
DEFAULT_SCAN_INTERVAL = 12
TRACK_NEW_DEVICES = "track_new_devices"
CONF_AWAY_HIDE = 'hide_if_away'
DEFAULT_AWAY_HIDE = False
SERVICE_SEE = 'see'
ATTR_LATITUDE = 'latitude'
ATTR_LONGITUDE = 'longitude'
ATTR_MAC = 'mac'
ATTR_DEV_ID = 'dev_id'
ATTR_HOST_NAME = 'host_name'
ATTR_LOCATION_NAME = 'location_name'
ATTR_GPS = 'gps'
DISCOVERY_PLATFORMS = {
discovery.SERVICE_NETGEAR: 'netgear',
}
_LOGGER = logging.getLogger(__name__)
# pylint: disable=too-many-arguments
def is_on(hass, entity_id=None):
""" Returns if any or specified device is home. """
@ -55,293 +85,309 @@ def is_on(hass, entity_id=None):
return hass.states.is_state(entity, STATE_HOME)
def see(hass, mac=None, dev_id=None, host_name=None, location_name=None,
gps=None):
""" Call service to notify you see device. """
data = {key: value for key, value in
((ATTR_MAC, mac),
(ATTR_DEV_ID, dev_id),
(ATTR_HOST_NAME, host_name),
(ATTR_LOCATION_NAME, location_name),
(ATTR_GPS, gps)) if value is not None}
hass.services.call(DOMAIN, SERVICE_SEE, data)
def setup(hass, config):
""" Sets up the device tracker. """
""" Setup device tracker """
yaml_path = hass.config.path(YAML_DEVICES)
csv_path = hass.config.path(CSV_DEVICES)
if os.path.isfile(csv_path) and not os.path.isfile(yaml_path) and \
convert_csv_config(csv_path, yaml_path):
os.remove(csv_path)
if not validate_config(config, {DOMAIN: [CONF_PLATFORM]}, _LOGGER):
return False
conf = config.get(DOMAIN, {})
consider_home = util.convert(conf.get(CONF_CONSIDER_HOME), int,
DEFAULT_CONF_CONSIDER_HOME)
track_new = util.convert(conf.get(CONF_TRACK_NEW), bool,
DEFAULT_CONF_TRACK_NEW)
tracker_type = config[DOMAIN].get(CONF_PLATFORM)
devices = load_config(yaml_path, hass, timedelta(seconds=consider_home))
tracker = DeviceTracker(hass, consider_home, track_new, devices)
tracker_implementation = \
prepare_setup_platform(hass, config, DOMAIN, tracker_type)
if tracker_implementation is None:
_LOGGER.error("Unknown device_tracker type specified: %s.",
tracker_type)
return False
device_scanner = tracker_implementation.get_scanner(hass, config)
if device_scanner is None:
_LOGGER.error("Failed to initialize device scanner: %s",
tracker_type)
return False
seconds = util.convert(config[DOMAIN].get(CONF_SECONDS), int,
DEFAULT_CONF_SECONDS)
track_new_devices = config[DOMAIN].get(TRACK_NEW_DEVICES) or False
_LOGGER.info("Tracking new devices: %s", track_new_devices)
tracker = DeviceTracker(hass, device_scanner, seconds, track_new_devices)
# We only succeeded if we got to parse the known devices file
return not tracker.invalid_known_devices_file
class DeviceTracker(object):
""" Class that tracks which devices are home and which are not. """
def __init__(self, hass, device_scanner, seconds, track_new_devices):
self.hass = hass
self.device_scanner = device_scanner
self.lock = threading.Lock()
# Do we track new devices by default?
self.track_new_devices = track_new_devices
# Dictionary to keep track of known devices and devices we track
self.tracked = {}
self.untracked_devices = set()
# Did we encounter an invalid known devices file
self.invalid_known_devices_file = False
# Wrap it in a func instead of lambda so it can be identified in
# the bus by its __name__ attribute.
def update_device_state(now):
""" Triggers update of the device states. """
self.update_devices(now)
dev_group = group.Group(
hass, GROUP_NAME_ALL_DEVICES, user_defined=False)
def reload_known_devices_service(service):
""" Reload known devices file. """
self._read_known_devices_file()
self.update_devices(dt_util.utcnow())
dev_group.update_tracked_entity_ids(self.device_entity_ids)
reload_known_devices_service(None)
if self.invalid_known_devices_file:
return
seconds = range(0, 60, seconds)
_LOGGER.info("Device tracker interval second=%s", seconds)
track_utc_time_change(hass, update_device_state, second=seconds)
hass.services.register(DOMAIN,
SERVICE_DEVICE_TRACKER_RELOAD,
reload_known_devices_service)
@property
def device_entity_ids(self):
""" Returns a set containing all device entity ids
that are being tracked. """
return set(device['entity_id'] for device in self.tracked.values())
def _update_state(self, now, device, is_home):
""" Update the state of a device. """
dev_info = self.tracked[device]
if is_home:
# Update last seen if at home
dev_info['last_seen'] = now
else:
# State remains at home if it has been seen in the last
# TIME_DEVICE_NOT_FOUND
is_home = now - dev_info['last_seen'] < TIME_DEVICE_NOT_FOUND
state = STATE_HOME if is_home else STATE_NOT_HOME
# overwrite properties that have been set in the config file
attr = dict(dev_info['state_attr'])
attr.update(_OVERWRITE.get(dev_info['entity_id'], {}))
self.hass.states.set(
dev_info['entity_id'], state, attr)
def update_devices(self, now):
""" Update device states based on the found devices. """
if not self.lock.acquire(False):
def setup_platform(p_type, p_config, disc_info=None):
""" Setup a device tracker platform. """
platform = prepare_setup_platform(hass, config, DOMAIN, p_type)
if platform is None:
return
try:
found_devices = set(dev.upper() for dev in
self.device_scanner.scan_devices())
if hasattr(platform, 'get_scanner'):
scanner = platform.get_scanner(hass, {DOMAIN: p_config})
for device in self.tracked:
is_home = device in found_devices
if scanner is None:
_LOGGER.error('Error setting up platform %s', p_type)
return
self._update_state(now, device, is_home)
setup_scanner_platform(hass, p_config, scanner, tracker.see)
return
if is_home:
found_devices.remove(device)
if not platform.setup_scanner(hass, p_config, tracker.see):
_LOGGER.error('Error setting up platform %s', p_type)
except Exception: # pylint: disable=broad-except
_LOGGER.exception('Error setting up platform %s', p_type)
# Did we find any devices that we didn't know about yet?
new_devices = found_devices - self.untracked_devices
for p_type, p_config in \
config_per_platform(config, DOMAIN, _LOGGER):
setup_platform(p_type, p_config)
if new_devices:
if not self.track_new_devices:
self.untracked_devices.update(new_devices)
def device_tracker_discovered(service, info):
""" Called when a device tracker platform is discovered. """
setup_platform(DISCOVERY_PLATFORMS[service], {}, info)
self._update_known_devices_file(new_devices)
finally:
self.lock.release()
discovery.listen(hass, DISCOVERY_PLATFORMS.keys(),
device_tracker_discovered)
# pylint: disable=too-many-branches
def _read_known_devices_file(self):
""" Parse and process the known devices file. """
known_dev_path = self.hass.config.path(KNOWN_DEVICES_FILE)
def update_stale(now):
""" Clean up stale devices. """
tracker.update_stale(now)
track_utc_time_change(hass, update_stale, second=range(0, 60, 5))
# Return if no known devices file exists
if not os.path.isfile(known_dev_path):
tracker.setup_group()
def see_service(call):
""" Service to see a device. """
args = {key: value for key, value in call.data.items() if key in
(ATTR_MAC, ATTR_DEV_ID, ATTR_HOST_NAME, ATTR_LOCATION_NAME,
ATTR_GPS)}
tracker.see(**args)
hass.services.register(DOMAIN, SERVICE_SEE, see_service)
return True
class DeviceTracker(object):
""" Track devices """
def __init__(self, hass, consider_home, track_new, devices):
self.hass = hass
self.devices = {dev.dev_id: dev for dev in devices}
self.mac_to_dev = {dev.mac: dev for dev in devices if dev.mac}
self.consider_home = timedelta(seconds=consider_home)
self.track_new = track_new
self.lock = threading.Lock()
entity_ids = []
for device in devices:
if device.track:
entity_ids.append(device.entity_id)
device.update_ha_state()
self.group = None
def see(self, mac=None, dev_id=None, host_name=None, location_name=None,
gps=None):
""" Notify device tracker that you see a device. """
with self.lock:
if mac is None and dev_id is None:
raise HomeAssistantError('Neither mac or device id passed in')
elif mac is not None:
mac = mac.upper()
device = self.mac_to_dev.get(mac)
if not device:
dev_id = util.slugify(host_name or mac)
else:
dev_id = str(dev_id)
device = self.devices.get(dev_id)
if device:
device.seen(host_name, location_name, gps)
if device.track:
device.update_ha_state()
return
# If no device can be found, create it
device = Device(
self.hass, self.consider_home, self.track_new, dev_id, mac,
(host_name or dev_id).replace('_', ' '))
self.devices[dev_id] = device
if mac is not None:
self.mac_to_dev[mac] = device
device.seen(host_name, location_name, gps)
if device.track:
device.update_ha_state()
# During init, we ignore the group
if self.group is not None:
self.group.update_tracked_entity_ids(
list(self.group.tracking) + [device.entity_id])
update_config(self.hass.config.path(YAML_DEVICES), dev_id, device)
def setup_group(self):
""" Initializes group for all tracked devices. """
entity_ids = (dev.entity_id for dev in self.devices.values()
if dev.track)
self.group = group.setup_group(
self.hass, GROUP_NAME_ALL_DEVICES, entity_ids, False)
def update_stale(self, now):
""" Update stale devices. """
with self.lock:
for device in self.devices.values():
if device.last_update_home and device.stale(now):
device.update_ha_state(True)
class Device(Entity):
""" Tracked device. """
# pylint: disable=too-many-instance-attributes, too-many-arguments
host_name = None
location_name = None
gps = None
last_seen = None
# Track if the last update of this device was HOME
last_update_home = False
_state = STATE_NOT_HOME
def __init__(self, hass, consider_home, track, dev_id, mac, name=None,
picture=None, away_hide=False):
self.hass = hass
self.entity_id = ENTITY_ID_FORMAT.format(dev_id)
# Timedelta object how long we consider a device home if it is not
# detected anymore.
self.consider_home = consider_home
# Device ID
self.dev_id = dev_id
self.mac = mac
# If we should track this device
self.track = track
# Configured name
self.config_name = name
# Configured picture
self.config_picture = picture
self.away_hide = away_hide
@property
def name(self):
""" Returns the name of the entity. """
return self.config_name or self.host_name or DEVICE_DEFAULT_NAME
@property
def state(self):
""" State of the device. """
return self._state
@property
def state_attributes(self):
""" Device state attributes. """
attr = {}
if self.config_picture:
attr[ATTR_ENTITY_PICTURE] = self.config_picture
if self.gps:
attr[ATTR_LATITUDE] = self.gps[0],
attr[ATTR_LONGITUDE] = self.gps[1],
return attr
@property
def hidden(self):
""" If device should be hidden. """
return self.away_hide and self.state != STATE_HOME
def seen(self, host_name=None, location_name=None, gps=None):
""" Mark the device as seen. """
self.last_seen = dt_util.utcnow()
self.host_name = host_name
self.location_name = location_name
self.gps = gps
self.update()
def stale(self, now=None):
""" Return if device state is stale. """
return self.last_seen and \
(now or dt_util.utcnow()) - self.last_seen > self.consider_home
def update(self):
""" Update state of entity. """
if not self.last_seen:
return
elif self.location_name:
self._state = self.location_name
elif self.stale():
self._state = STATE_NOT_HOME
self.last_update_home = False
else:
self._state = STATE_HOME
self.last_update_home = True
self.lock.acquire()
self.untracked_devices.clear()
def convert_csv_config(csv_path, yaml_path):
""" Convert CSV config file format to YAML. """
used_ids = set()
with open(csv_path) as inp:
for row in csv.DictReader(inp):
dev_id = util.ensure_unique_string(
util.slugify(row['name']) or DEVICE_DEFAULT_NAME, used_ids)
used_ids.add(dev_id)
device = Device(None, None, row['track'] == '1', dev_id,
row['device'], row['name'], row['picture'])
update_config(yaml_path, dev_id, device)
return True
with open(known_dev_path) as inp:
# To track which devices need an entity_id assigned
need_entity_id = []
def load_config(path, hass, consider_home):
""" Load devices from YAML config file. """
if not os.path.isfile(path):
return []
return [
Device(hass, consider_home, device.get('track', False),
str(dev_id), device.get('mac'), device.get('name'),
device.get('picture'), device.get(CONF_AWAY_HIDE, False))
for dev_id, device in load_yaml_config_file(path).items()]
# All devices that are still in this set after we read the CSV file
# have been removed from the file and thus need to be cleaned up.
removed_devices = set(self.tracked.keys())
try:
for row in csv.DictReader(inp):
device = row['device'].upper()
def setup_scanner_platform(hass, config, scanner, see_device):
""" Helper method to connect scanner-based platform to device tracker. """
interval = util.convert(config.get(CONF_SCAN_INTERVAL), int,
DEFAULT_SCAN_INTERVAL)
if row['track'] == '1':
if device in self.tracked:
# Device exists
removed_devices.remove(device)
else:
# We found a new device
need_entity_id.append(device)
# Initial scan of each mac we also tell about host name for config
seen = set()
self._track_device(device, row['name'])
def device_tracker_scan(now):
""" Called when interval matches. """
for mac in scanner.scan_devices():
if mac in seen:
host_name = None
else:
host_name = scanner.get_device_name(mac)
seen.add(mac)
see_device(mac=mac, host_name=host_name)
# Update state_attr with latest from file
state_attr = {
ATTR_FRIENDLY_NAME: row['name']
}
track_utc_time_change(hass, device_tracker_scan, second=range(0, 60,
interval))
if row['picture']:
state_attr[ATTR_ENTITY_PICTURE] = row['picture']
device_tracker_scan(None)
self.tracked[device]['state_attr'] = state_attr
else:
self.untracked_devices.add(device)
def update_config(path, dev_id, device):
""" Add device to YAML config file. """
with open(path, 'a') as out:
out.write('\n')
out.write('{}:\n'.format(device.dev_id))
# Remove existing devices that we no longer track
for device in removed_devices:
entity_id = self.tracked[device]['entity_id']
_LOGGER.info("Removing entity %s", entity_id)
self.hass.states.remove(entity_id)
self.tracked.pop(device)
self._generate_entity_ids(need_entity_id)
if not self.tracked:
_LOGGER.warning(
"No devices to track. Please update %s.",
known_dev_path)
_LOGGER.info("Loaded devices from %s", known_dev_path)
except KeyError:
self.invalid_known_devices_file = True
_LOGGER.warning(
("Invalid known devices file: %s. "
"We won't update it with new found devices."),
known_dev_path)
finally:
self.lock.release()
def _update_known_devices_file(self, new_devices):
""" Add new devices to known devices file. """
if not self.invalid_known_devices_file:
known_dev_path = self.hass.config.path(KNOWN_DEVICES_FILE)
try:
# If file does not exist we will write the header too
is_new_file = not os.path.isfile(known_dev_path)
with open(known_dev_path, 'a') as outp:
_LOGGER.info("Found %d new devices, updating %s",
len(new_devices), known_dev_path)
writer = csv.writer(outp)
if is_new_file:
writer.writerow(("device", "name", "track", "picture"))
for device in new_devices:
# See if the device scanner knows the name
# else defaults to unknown device
name = self.device_scanner.get_device_name(device) or \
DEVICE_DEFAULT_NAME
track = 0
if self.track_new_devices:
self._track_device(device, name)
track = 1
writer.writerow((device, name, track, ""))
if self.track_new_devices:
self._generate_entity_ids(new_devices)
except IOError:
_LOGGER.exception("Error updating %s with %d new devices",
known_dev_path, len(new_devices))
def _track_device(self, device, name):
"""
Add a device to the list of tracked devices.
Does not generate the entity id yet.
"""
default_last_seen = dt_util.utcnow().replace(year=1990)
self.tracked[device] = {
'name': name,
'last_seen': default_last_seen,
'state_attr': {ATTR_FRIENDLY_NAME: name}
}
def _generate_entity_ids(self, need_entity_id):
""" Generate entity ids for a list of devices. """
# Setup entity_ids for the new devices
used_entity_ids = [info['entity_id'] for device, info
in self.tracked.items()
if device not in need_entity_id]
for device in need_entity_id:
name = self.tracked[device]['name']
entity_id = util.ensure_unique_string(
ENTITY_ID_FORMAT.format(util.slugify(name)),
used_entity_ids)
used_entity_ids.append(entity_id)
self.tracked[device]['entity_id'] = entity_id
for key, value in (('name', device.name), ('mac', device.mac),
('picture', device.config_picture),
('track', 'yes' if device.track else 'no'),
(CONF_AWAY_HIDE,
'yes' if device.away_hide else 'no')):
out.write(' {}: {}\n'.format(key, '' if value is None else value))

View File

@ -157,11 +157,19 @@ class AsusWrtDeviceScanner(object):
devices = {}
for lease in leases_result:
match = _LEASES_REGEX.search(lease.decode('utf-8'))
# For leases where the client doesn't set a hostname, ensure
# it is blank and not '*', which breaks the entity_id down
# the line
host = match.group('host')
if host == '*':
host = ''
devices[match.group('ip')] = {
'host': host,
'status': '',
'ip': match.group('ip'),
'mac': match.group('mac').upper(),
'host': match.group('host'),
'status': ''
}
for neighbor in neighbors:

View File

@ -0,0 +1,48 @@
"""
homeassistant.components.device_tracker.mqtt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
MQTT platform for the device tracker.
device_tracker:
platform: mqtt
qos: 1
devices:
paulus_oneplus: /location/paulus
annetherese_n4: /location/annetherese
"""
import logging
from homeassistant import util
import homeassistant.components.mqtt as mqtt
DEPENDENCIES = ['mqtt']
CONF_QOS = 'qos'
CONF_DEVICES = 'devices'
DEFAULT_QOS = 0
_LOGGER = logging.getLogger(__name__)
def setup_scanner(hass, config, see):
""" Set up a MQTT tracker. """
devices = config.get(CONF_DEVICES)
qos = util.convert(config.get(CONF_QOS), int, DEFAULT_QOS)
if not isinstance(devices, dict):
_LOGGER.error('Expected %s to be a dict, found %s', CONF_DEVICES,
devices)
return False
dev_id_lookup = {}
def device_tracker_message_received(topic, payload, qos):
""" MQTT message received. """
see(dev_id=dev_id_lookup[topic], location_name=payload)
for dev_id, topic in devices.items():
dev_id_lookup[topic] = dev_id
mqtt.subscribe(hass, topic, device_tracker_message_received, qos)
return True

View File

@ -70,7 +70,6 @@ class NetgearDeviceScanner(object):
self.lock = threading.Lock()
if host is None:
print("BIER")
self._api = pynetgear.Netgear()
elif username is None:
self._api = pynetgear.Netgear(password, host)

View File

@ -44,7 +44,7 @@ _LOGGER = logging.getLogger(__name__)
# interval in minutes to exclude devices from a scan while they are home
CONF_HOME_INTERVAL = "home_interval"
REQUIREMENTS = ['python-nmap==0.4.1']
REQUIREMENTS = ['python-nmap==0.4.3']
def get_scanner(hass, config):

View File

@ -19,22 +19,22 @@ from homeassistant.const import (
DOMAIN = "discovery"
DEPENDENCIES = []
REQUIREMENTS = ['netdisco==0.3']
REQUIREMENTS = ['netdisco==0.4']
SCAN_INTERVAL = 300 # seconds
# Next 3 lines for now a mirror from netdisco.const
# Should setup a mapping netdisco.const -> own constants
SERVICE_WEMO = 'belkin_wemo'
SERVICE_HUE = 'philips_hue'
SERVICE_CAST = 'google_cast'
SERVICE_NETGEAR = 'netgear_router'
SERVICE_SONOS = 'sonos'
SERVICE_HANDLERS = {
SERVICE_WEMO: "switch",
SERVICE_CAST: "media_player",
SERVICE_HUE: "light",
SERVICE_NETGEAR: 'device_tracker',
SERVICE_SONOS: 'media_player',
}
@ -79,13 +79,6 @@ def setup(hass, config):
if not component:
return
# Hack - fix when device_tracker supports discovery
if service == SERVICE_NETGEAR:
bootstrap.setup_component(hass, component, {
'device_tracker': {'platform': 'netgear'}
})
return
# This component cannot be setup.
if not bootstrap.setup_component(hass, component, config):
return

View File

@ -1,2 +1,2 @@
""" DO NOT MODIFY. Auto-generated by build_frontend script """
VERSION = "35ecb5457a9ff0f4142c2605b53eb843"
VERSION = "397aa7c09f4938b1358672c9983f9f32"

File diff suppressed because one or more lines are too long

@ -1 +1 @@
Subproject commit b0b12e20e0f61df849c414c2dfbcf9923f784631
Subproject commit 9637d5d26516873b8a04a3c62b9596163c822a2d

View File

@ -147,7 +147,7 @@ def _api_history_period(handler, path_match, data):
end_time = start_time + one_day
print("Fetchign", start_time, end_time)
print("Fetching", start_time, end_time)
entity_id = data.get('filter_entity_id')

View File

@ -205,7 +205,7 @@ class HomeAssistantHTTPServer(ThreadingMixIn, HTTPServer):
self.serve_forever()
def register_path(self, method, url, callback, require_auth=True):
""" Registers a path wit the server. """
""" Registers a path with the server. """
self.paths.append((method, url, callback, require_auth))
def log_message(self, fmt, *args):
@ -487,7 +487,7 @@ class ServerSession:
return self._expiry < date_util.utcnow()
class SessionStore:
class SessionStore(object):
""" Responsible for storing and retrieving http sessions """
def __init__(self, enabled=True):
""" Set up the session store """

View File

@ -12,9 +12,10 @@ from homeassistant.core import State, DOMAIN as HA_DOMAIN
from homeassistant.const import (
EVENT_STATE_CHANGED, STATE_HOME, STATE_ON, STATE_OFF,
EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP, HTTP_BAD_REQUEST)
from homeassistant import util
import homeassistant.util.dt as dt_util
import homeassistant.components.recorder as recorder
import homeassistant.components.sun as sun
from homeassistant.components import recorder, sun
DOMAIN = "logbook"
DEPENDENCIES = ['recorder', 'http']
@ -25,8 +26,29 @@ QUERY_EVENTS_BETWEEN = """
SELECT * FROM events WHERE time_fired > ? AND time_fired < ?
"""
EVENT_LOGBOOK_ENTRY = 'LOGBOOK_ENTRY'
GROUP_BY_MINUTES = 15
ATTR_NAME = 'name'
ATTR_MESSAGE = 'message'
ATTR_DOMAIN = 'domain'
ATTR_ENTITY_ID = 'entity_id'
def log_entry(hass, name, message, domain=None, entity_id=None):
""" Adds an entry to the logbook. """
data = {
ATTR_NAME: name,
ATTR_MESSAGE: message
}
if domain is not None:
data[ATTR_DOMAIN] = domain
if entity_id is not None:
data[ATTR_ENTITY_ID] = entity_id
hass.bus.fire(EVENT_LOGBOOK_ENTRY, data)
def setup(hass, config):
""" Listens for download events to download files. """
@ -175,6 +197,20 @@ def humanify(events):
event.time_fired, "Home Assistant", action,
domain=HA_DOMAIN)
elif event.event_type == EVENT_LOGBOOK_ENTRY:
domain = event.data.get(ATTR_DOMAIN)
entity_id = event.data.get(ATTR_ENTITY_ID)
if domain is None and entity_id is not None:
try:
domain = util.split_entity_id(str(entity_id))[0]
except IndexError:
pass
yield Entry(
event.time_fired, event.data.get(ATTR_NAME),
event.data.get(ATTR_MESSAGE), domain,
entity_id)
def _entry_message_from_state(domain, state):
""" Convert a state to a message for the logbook. """

View File

@ -19,12 +19,13 @@ from homeassistant.const import (
DOMAIN = 'media_player'
DEPENDENCIES = []
SCAN_INTERVAL = 30
SCAN_INTERVAL = 10
ENTITY_ID_FORMAT = DOMAIN + '.{}'
DISCOVERY_PLATFORMS = {
discovery.SERVICE_CAST: 'cast',
discovery.SERVICE_SONOS: 'sonos',
}
SERVICE_YOUTUBE_VIDEO = 'play_youtube_video'

View File

@ -0,0 +1,444 @@
"""
homeassistant.components.media_player.itunes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides an interface to iTunes-API (https://github.com/maddox/itunes-api)
The iTunes media player will allow you to control your iTunes instance. You
can play/pause/next/previous/mute, adjust volume, etc.
In addition to controlling iTunes, your available AirPlay endpoints will be
added as media players as well. You can then individually address them append
turn them on, turn them off, or adjust their volume.
Configuration:
To use iTunes you will need to add something like the following to
your configuration.yaml file.
media_player:
platform: itunes
name: iTunes
host: http://192.168.1.16
port: 8181
Variables:
name
*Optional
The name of the device.
url
*Required
URL of your running version of iTunes-API. Example: http://192.168.1.50:8181
"""
import logging
from homeassistant.components.media_player import (
MediaPlayerDevice, MEDIA_TYPE_MUSIC, SUPPORT_PAUSE, SUPPORT_SEEK,
SUPPORT_VOLUME_SET, SUPPORT_VOLUME_MUTE, SUPPORT_PREVIOUS_TRACK,
SUPPORT_NEXT_TRACK, SUPPORT_TURN_ON, SUPPORT_TURN_OFF,
ATTR_ENTITY_PICTURE, ATTR_SUPPORTED_MEDIA_COMMANDS)
from homeassistant.const import (
STATE_IDLE, STATE_PLAYING, STATE_PAUSED, STATE_OFF, STATE_ON)
import requests
_LOGGER = logging.getLogger(__name__)
SUPPORT_ITUNES = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_PREVIOUS_TRACK | SUPPORT_NEXT_TRACK | SUPPORT_SEEK
SUPPORT_AIRPLAY = SUPPORT_VOLUME_SET | SUPPORT_TURN_ON | SUPPORT_TURN_OFF
DOMAIN = 'itunes'
class Itunes(object):
""" itunes-api client. """
def __init__(self, host, port):
self.host = host
self.port = port
@property
def _base_url(self):
""" Returns the base url for endpoints. """
return self.host + ":" + str(self.port)
def _request(self, method, path, params=None):
""" Makes the actual request and returns the parsed response. """
url = self._base_url + path
try:
if method == 'GET':
response = requests.get(url)
elif method == "POST":
response = requests.put(url, params)
elif method == "PUT":
response = requests.put(url, params)
elif method == "DELETE":
response = requests.delete(url)
return response.json()
except requests.exceptions.HTTPError:
return {'player_state': 'error'}
except requests.exceptions.RequestException:
return {'player_state': 'offline'}
def _command(self, named_command):
""" Makes a request for a controlling command. """
return self._request('PUT', '/' + named_command)
def now_playing(self):
""" Returns the current state. """
return self._request('GET', '/now_playing')
def set_volume(self, level):
""" Sets the volume and returns the current state, level 0-100. """
return self._request('PUT', '/volume', {'level': level})
def set_muted(self, muted):
""" Mutes and returns the current state, muted True or False. """
return self._request('PUT', '/mute', {'muted': muted})
def play(self):
""" Sets playback to play and returns the current state. """
return self._command('play')
def pause(self):
""" Sets playback to paused and returns the current state. """
return self._command('pause')
def next(self):
""" Skips to the next track and returns the current state. """
return self._command('next')
def previous(self):
""" Skips back and returns the current state. """
return self._command('previous')
def artwork_url(self):
""" Returns a URL of the current track's album art. """
return self._base_url + '/artwork'
def airplay_devices(self):
""" Returns a list of AirPlay devices. """
return self._request('GET', '/airplay_devices')
def airplay_device(self, device_id):
""" Returns an AirPlay device. """
return self._request('GET', '/airplay_devices/' + device_id)
def toggle_airplay_device(self, device_id, toggle):
""" Toggles airplay device on or off, id, toggle True or False. """
command = 'on' if toggle else 'off'
path = '/airplay_devices/' + device_id + '/' + command
return self._request('PUT', path)
def set_volume_airplay_device(self, device_id, level):
""" Sets volume, returns current state of device, id,level 0-100. """
path = '/airplay_devices/' + device_id + '/volume'
return self._request('PUT', path, {'level': level})
# pylint: disable=unused-argument
# pylint: disable=abstract-method
# pylint: disable=too-many-instance-attributes
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the itunes platform. """
add_devices([
ItunesDevice(
config.get('name', 'iTunes'),
config.get('host'),
config.get('port'),
add_devices
)
])
class ItunesDevice(MediaPlayerDevice):
""" Represents a iTunes-API instance. """
# pylint: disable=too-many-public-methods
def __init__(self, name, host, port, add_devices):
self._name = name
self._host = host
self._port = port
self._add_devices = add_devices
self.client = Itunes(self._host, self._port)
self.current_volume = None
self.muted = None
self.current_title = None
self.current_album = None
self.current_artist = None
self.current_playlist = None
self.content_id = None
self.player_state = None
self.airplay_devices = {}
self.update()
def update_state(self, state_hash):
""" Update all the state properties with the passed in dictionary. """
self.player_state = state_hash.get('player_state', None)
self.current_volume = state_hash.get('volume', 0)
self.muted = state_hash.get('muted', None)
self.current_title = state_hash.get('name', None)
self.current_album = state_hash.get('album', None)
self.current_artist = state_hash.get('artist', None)
self.current_playlist = state_hash.get('playlist', None)
self.content_id = state_hash.get('id', None)
@property
def name(self):
""" Returns the name of the device. """
return self._name
@property
def state(self):
""" Returns the state of the device. """
if self.player_state == 'offline' or self.player_state is None:
return 'offline'
if self.player_state == 'error':
return 'error'
if self.player_state == 'stopped':
return STATE_IDLE
if self.player_state == 'paused':
return STATE_PAUSED
else:
return STATE_PLAYING
def update(self):
""" Retrieve latest state. """
now_playing = self.client.now_playing()
self.update_state(now_playing)
found_devices = self.client.airplay_devices()
found_devices = found_devices.get('airplay_devices', [])
new_devices = []
for device_data in found_devices:
device_id = device_data.get('id')
if self.airplay_devices.get(device_id):
# update it
airplay_device = self.airplay_devices.get(device_id)
airplay_device.update_state(device_data)
else:
# add it
airplay_device = AirPlayDevice(device_id, self.client)
airplay_device.update_state(device_data)
self.airplay_devices[device_id] = airplay_device
new_devices.append(airplay_device)
if new_devices:
self._add_devices(new_devices)
@property
def is_volume_muted(self):
""" Boolean if volume is currently muted. """
return self.muted
@property
def volume_level(self):
""" Volume level of the media player (0..1). """
return self.current_volume/100.0
@property
def media_content_id(self):
""" Content ID of current playing media. """
return self.content_id
@property
def media_content_type(self):
""" Content type of current playing media. """
return MEDIA_TYPE_MUSIC
@property
def media_image_url(self):
""" Image url of current playing media. """
if self.player_state in (STATE_PLAYING, STATE_IDLE, STATE_PAUSED) and \
self.current_title is not None:
return self.client.artwork_url()
else:
return 'https://cloud.githubusercontent.com/assets/260/9829355' \
'/33fab972-58cf-11e5-8ea2-2ca74bdaae40.png'
@property
def media_title(self):
""" Title of current playing media. """
return self.current_title
@property
def media_artist(self):
""" Artist of current playing media. (Music track only) """
return self.current_artist
@property
def media_album_name(self):
""" Album of current playing media. (Music track only) """
return self.current_album
@property
def supported_media_commands(self):
""" Flags of media commands that are supported. """
return SUPPORT_ITUNES
def set_volume_level(self, volume):
""" set volume level, range 0..1. """
response = self.client.set_volume(int(volume * 100))
self.update_state(response)
def mute_volume(self, mute):
""" mute (true) or unmute (false) media player. """
response = self.client.set_muted(mute)
self.update_state(response)
def media_play(self):
""" media_play media player. """
response = self.client.play()
self.update_state(response)
def media_pause(self):
""" media_pause media player. """
response = self.client.pause()
self.update_state(response)
def media_next_track(self):
""" media_next media player. """
response = self.client.next()
self.update_state(response)
def media_previous_track(self):
""" media_previous media player. """
response = self.client.previous()
self.update_state(response)
class AirPlayDevice(MediaPlayerDevice):
""" Represents an AirPlay device via an iTunes-API instance. """
# pylint: disable=too-many-public-methods
def __init__(self, device_id, client):
self._id = device_id
self.client = client
self.device_name = "AirPlay"
self.kind = None
self.active = False
self.selected = False
self.volume = 0
self.supports_audio = False
self.supports_video = False
self.player_state = None
def update_state(self, state_hash):
""" Update all the state properties with the passed in dictionary. """
if 'player_state' in state_hash:
self.player_state = state_hash.get('player_state', None)
if 'name' in state_hash:
self.device_name = state_hash.get('name', 'AirPlay')
if 'kind' in state_hash:
self.kind = state_hash.get('kind', None)
if 'active' in state_hash:
self.active = state_hash.get('active', None)
if 'selected' in state_hash:
self.selected = state_hash.get('selected', None)
if 'sound_volume' in state_hash:
self.volume = state_hash.get('sound_volume', 0)
if 'supports_audio' in state_hash:
self.supports_audio = state_hash.get('supports_audio', None)
if 'supports_video' in state_hash:
self.supports_video = state_hash.get('supports_video', None)
@property
def name(self):
""" Returns the name of the device. """
return self.device_name
@property
def state(self):
""" Returns the state of the device. """
if self.selected is True:
return STATE_ON
else:
return STATE_OFF
def update(self):
""" Retrieve latest state. """
@property
def volume_level(self):
return float(self.volume)/100.0
@property
def media_content_type(self):
return MEDIA_TYPE_MUSIC
@property
def supported_media_commands(self):
""" Flags of media commands that are supported. """
return SUPPORT_AIRPLAY
@property
def device_state_attributes(self):
""" Return the state attributes. """
state_attr = {}
state_attr[ATTR_SUPPORTED_MEDIA_COMMANDS] = SUPPORT_AIRPLAY
if self.state == STATE_OFF:
state_attr[ATTR_ENTITY_PICTURE] = \
('https://cloud.githubusercontent.com/assets/260/9833073'
'/6eb5c906-5958-11e5-9b4a-472cdf36be16.png')
else:
state_attr[ATTR_ENTITY_PICTURE] = \
('https://cloud.githubusercontent.com/assets/260/9833072'
'/6eb13cce-5958-11e5-996f-e2aaefbc9a24.png')
return state_attr
def set_volume_level(self, volume):
""" set volume level, range 0..1. """
volume = int(volume * 100)
response = self.client.set_volume_airplay_device(self._id, volume)
self.update_state(response)
def turn_on(self):
""" Select AirPlay. """
self.update_state({"selected": True})
self.update_ha_state()
response = self.client.toggle_airplay_device(self._id, True)
self.update_state(response)
def turn_off(self):
""" Deselect AirPlay. """
self.update_state({"selected": False})
self.update_ha_state()
response = self.client.toggle_airplay_device(self._id, False)
self.update_state(response)

View File

@ -107,6 +107,7 @@ class KodiDevice(MediaPlayerDevice):
try:
return self._server.Player.GetActivePlayers()
except jsonrpc_requests.jsonrpc.TransportError:
_LOGGER.exception('Unable to fetch kodi data')
return None
@property

View File

@ -0,0 +1,198 @@
"""
homeassistant.components.media_player.sonos
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides an interface to Sonos players (via SoCo)
Configuration:
To use SoCo, add something like this to your configuration:
media_player:
platform: sonos
"""
import logging
import datetime
from homeassistant.components.media_player import (
MediaPlayerDevice, SUPPORT_PAUSE, SUPPORT_SEEK, SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_MUTE, SUPPORT_PREVIOUS_TRACK, SUPPORT_NEXT_TRACK,
MEDIA_TYPE_MUSIC)
from homeassistant.const import (
STATE_IDLE, STATE_PLAYING, STATE_PAUSED, STATE_UNKNOWN)
REQUIREMENTS = ['SoCo==0.11.1']
_LOGGER = logging.getLogger(__name__)
SUPPORT_SONOS = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE |\
SUPPORT_PREVIOUS_TRACK | SUPPORT_NEXT_TRACK | SUPPORT_SEEK
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Sets up the Sonos platform. """
import soco
players = soco.discover()
if not players:
_LOGGER.warning('No Sonos speakers found. Disabling: %s', __name__)
return False
add_devices(SonosDevice(hass, p) for p in players)
_LOGGER.info('Added %s Sonos speakers', len(players))
return True
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
# pylint: disable=abstract-method
class SonosDevice(MediaPlayerDevice):
""" Represents a Sonos device. """
# pylint: disable=too-many-arguments
def __init__(self, hass, player):
self.hass = hass
super(SonosDevice, self).__init__()
self._player = player
self.update()
@property
def should_poll(self):
return True
def update_sonos(self, now):
""" Updates state, called by track_utc_time_change """
self.update_ha_state(True)
@property
def name(self):
""" Returns the name of the device. """
return self._name
@property
def unique_id(self):
""" Returns a unique id. """
return "{}.{}".format(self.__class__, self._player.uid)
@property
def state(self):
""" Returns the state of the device. """
if self._status == 'PAUSED_PLAYBACK':
return STATE_PAUSED
if self._status == 'PLAYING':
return STATE_PLAYING
if self._status == 'STOPPED':
return STATE_IDLE
return STATE_UNKNOWN
def update(self):
""" Retrieve latest state. """
self._name = self._player.get_speaker_info()['zone_name'].replace(
' (R)', '').replace(' (L)', '')
self._status = self._player.get_current_transport_info().get(
'current_transport_state')
self._trackinfo = self._player.get_current_track_info()
@property
def volume_level(self):
""" Volume level of the media player (0..1). """
return self._player.volume / 100.0
@property
def is_volume_muted(self):
return self._player.mute
@property
def media_content_id(self):
""" Content ID of current playing media. """
return self._trackinfo.get('title', None)
@property
def media_content_type(self):
""" Content type of current playing media. """
return MEDIA_TYPE_MUSIC
@property
def media_duration(self):
""" Duration of current playing media in seconds. """
dur = self._trackinfo.get('duration', '0:00')
# If the speaker is playing from the "line-in" source, getting
# track metadata can return NOT_IMPLEMENTED, which breaks the
# volume logic below
if dur == 'NOT_IMPLEMENTED':
return None
return sum(60 ** x[0] * int(x[1]) for x in
enumerate(reversed(dur.split(':'))))
@property
def media_image_url(self):
""" Image url of current playing media. """
if 'album_art' in self._trackinfo:
return self._trackinfo['album_art']
@property
def media_title(self):
""" Title of current playing media. """
if 'artist' in self._trackinfo and 'title' in self._trackinfo:
return '{artist} - {title}'.format(
artist=self._trackinfo['artist'],
title=self._trackinfo['title']
)
if 'title' in self._status:
return self._trackinfo['title']
@property
def supported_media_commands(self):
""" Flags of media commands that are supported. """
return SUPPORT_SONOS
def turn_off(self):
""" turn_off media player. """
self._player.pause()
def volume_up(self):
""" volume_up media player. """
self._player.volume += 1
def volume_down(self):
""" volume_down media player. """
self._player.volume -= 1
def set_volume_level(self, volume):
""" set volume level, range 0..1. """
self._player.volume = str(int(volume * 100))
def mute_volume(self, mute):
""" mute (true) or unmute (false) media player. """
self._player.mute = mute
def media_play(self):
""" media_play media player. """
self._player.play()
def media_pause(self):
""" media_pause media player. """
self._player.pause()
def media_next_track(self):
""" Send next track command. """
self._player.next()
def media_previous_track(self):
""" Send next track command. """
self._player.previous()
def media_seek(self, position):
""" Send seek command. """
self._player.seek(str(datetime.timedelta(seconds=int(position))))
def turn_on(self):
""" turn the media player on. """
self._player.play()

View File

@ -60,6 +60,7 @@ MQTT_CLIENT = None
DEFAULT_PORT = 1883
DEFAULT_KEEPALIVE = 60
DEFAULT_QOS = 0
SERVICE_PUBLISH = 'publish'
EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED'
@ -79,17 +80,18 @@ ATTR_PAYLOAD = 'payload'
ATTR_QOS = 'qos'
def publish(hass, topic, payload, qos=0):
def publish(hass, topic, payload, qos=None):
""" Send an MQTT message. """
data = {
ATTR_TOPIC: topic,
ATTR_PAYLOAD: payload,
ATTR_QOS: qos,
}
if qos is not None:
data[ATTR_QOS] = qos
hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
def subscribe(hass, topic, callback, qos=0):
def subscribe(hass, topic, callback, qos=DEFAULT_QOS):
""" Subscribe to a topic. """
def mqtt_topic_subscriber(event):
""" Match subscribed MQTT topic. """
@ -141,7 +143,7 @@ def setup(hass, config):
""" Handle MQTT publish service calls. """
msg_topic = call.data.get(ATTR_TOPIC)
payload = call.data.get(ATTR_PAYLOAD)
qos = call.data.get(ATTR_QOS)
qos = call.data.get(ATTR_QOS, DEFAULT_QOS)
if msg_topic is None or payload is None:
return
MQTT_CLIENT.publish(msg_topic, payload, qos)

View File

@ -28,7 +28,7 @@ def create_event_listener(schedule, event_listener_data):
service = event_listener_data['service']
(hour, minute, second) = [int(x) for x in
event_listener_data['time'].split(':')]
event_listener_data['time'].split(':', 3)]
return TimeEventListener(schedule, service, hour, minute, second)

View File

@ -22,6 +22,8 @@ CONF_ALIAS = "alias"
CONF_SERVICE = "execute_service"
CONF_SERVICE_DATA = "service_data"
CONF_SEQUENCE = "sequence"
CONF_EVENT = "event"
CONF_EVENT_DATA = "event_data"
CONF_DELAY = "delay"
_LOGGER = logging.getLogger(__name__)
@ -109,6 +111,8 @@ class Script(object):
for action in self.actions:
if CONF_SERVICE in action:
self._call_service(action)
elif CONF_EVENT in action:
self._fire_event(action)
elif CONF_DELAY in action:
delay = timedelta(**action[CONF_DELAY])
point_in_time = date_util.now() + delay
@ -140,3 +144,10 @@ class Script(object):
domain, service = split_entity_id(action[CONF_SERVICE])
data = action.get(CONF_SERVICE_DATA, {})
self.hass.services.call(domain, service, data)
def _fire_event(self, action):
""" Fires an event. """
self.last_action = action.get(CONF_ALIAS, action[CONF_EVENT])
_LOGGER.info("Executing script %s step %s", self.alias,
self.last_action)
self.hass.bus.fire(action[CONF_EVENT], action.get(CONF_EVENT_DATA))

View File

@ -65,7 +65,7 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
resource = config.get('resource', None)
try:
response = get(resource)
response = get(resource, timeout=10)
except exceptions.MissingSchema:
_LOGGER.error("Missing resource or schema in configuration. "
"Add http:// to your URL.")
@ -141,7 +141,7 @@ class ArestData(object):
def update(self):
""" Gets the latest data from aREST device. """
try:
response = get(self.resource)
response = get(self.resource, timeout=10)
if 'error' in self.data:
del self.data['error']
self.data = response.json()['variables']

View File

@ -0,0 +1,136 @@
"""
homeassistant.components.sensor.command_sensor
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Allows to configure custom shell commands to turn a value for a sensor.
Configuration:
To use the command_line sensor you will need to add something like the
following to your configuration.yaml file.
sensor:
platform: command_sensor
name: "Command sensor"
command: sensor_command
unit_of_measurement: "°C"
correction_factor: 0.0001
decimal_places: 0
Variables:
name
*Optional
Name of the command sensor.
command
*Required
The action to take to get the value.
unit_of_measurement
*Optional
Defines the units of measurement of the sensor, if any.
correction_factor
*Optional
A float value to do some basic calculations.
decimal_places
*Optional
Number of decimal places of the value. Default is 0.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.command_sensor.html
"""
import logging
import subprocess
from datetime import timedelta
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "Command Sensor"
# Return cached results if last scan was less then this time ago
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=60)
# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices_callback, discovery_info=None):
""" Add the Command Sensor. """
if config.get('command') is None:
_LOGGER.error('Missing required variable: "command"')
return False
data = CommandSensorData(config.get('command'))
add_devices_callback([CommandSensor(
data,
config.get('name', DEFAULT_NAME),
config.get('unit_of_measurement'),
config.get('correction_factor', None),
config.get('decimal_places', 0)
)])
# pylint: disable=too-many-arguments
class CommandSensor(Entity):
""" Represents a sensor that is returning a value of a shell commands. """
def __init__(self, data, name, unit_of_measurement, corr_factor,
decimal_places):
self.data = data
self._name = name
self._state = False
self._unit_of_measurement = unit_of_measurement
self._corr_factor = float(corr_factor)
self._decimal_places = decimal_places
self.update()
@property
def name(self):
""" The name of the sensor. """
return self._name
@property
def unit_of_measurement(self):
""" Unit the value is expressed in. """
return self._unit_of_measurement
@property
def state(self):
""" Returns the state of the device. """
return self._state
def update(self):
""" Gets the latest data and updates the state. """
self.data.update()
value = self.data.value
if value is not None:
if self._corr_factor is not None:
self._state = round((int(value) * self._corr_factor),
self._decimal_places)
else:
self._state = value
# pylint: disable=too-few-public-methods
class CommandSensorData(object):
""" Class for handling the data retrieval. """
def __init__(self, command):
self.command = command
self.value = None
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
""" Gets the latest data with a shell command. """
_LOGGER.info('Running command: %s', self.command)
try:
return_value = subprocess.check_output(self.command.split())
self.value = return_value.strip().decode('utf-8')
except subprocess.CalledProcessError:
_LOGGER.error('Command failed: %s', self.command)

View File

@ -0,0 +1,205 @@
"""
homeassistant.components.sensor.glances
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gathers system information of hosts which running glances.
Configuration:
To use the glances sensor you will need to add something like the following
to your configuration.yaml file.
sensor:
platform: glances
name: Glances sensor
host: IP_ADDRESS
port: 61208
resources:
- 'disk_use_percent'
- 'disk_use'
- 'disk_free'
- 'memory_use_percent'
- 'memory_use'
- 'memory_free'
- 'swap_use_percent'
- 'swap_use'
- 'swap_free'
- 'processor_load'
- 'process_running'
- 'process_total'
- 'process_thread'
- 'process_sleeping'
Variables:
name
*Optional
The name of the sensor. Default is 'Glances Sensor'.
host
*Required
The IP address of your host, e.g. 192.168.1.32.
port
*Optional
The network port to connect to. Default is 61208.
resources
*Required
Resources to monitor on the host. See the configuration example above for a
list of all available conditions to monitor.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.glances.html
"""
import logging
import requests
from datetime import timedelta
from homeassistant.util import Throttle
from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'Glances Sensor'
_RESOURCE = '/api/2/all'
SENSOR_TYPES = {
'disk_use_percent': ['Disk Use', '%'],
'disk_use': ['Disk Use', 'GiB'],
'disk_free': ['Disk Free', 'GiB'],
'memory_use_percent': ['RAM Use', '%'],
'memory_use': ['RAM Use', 'MiB'],
'memory_free': ['RAM Free', 'MiB'],
'swap_use_percent': ['Swap Use', '%'],
'swap_use': ['Swap Use', 'GiB'],
'swap_free': ['Swap Free', 'GiB'],
'processor_load': ['CPU Load', ''],
'process_running': ['Running', ''],
'process_total': ['Total', ''],
'process_thread': ['Thread', ''],
'process_sleeping': ['Sleeping', '']
}
_LOGGER = logging.getLogger(__name__)
# Return cached results if last scan was less then this time ago
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=60)
# pylint: disable=unused-variable
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Setup the Glances sensor. """
if not config.get('host'):
_LOGGER.error('"host:" is missing your configuration')
return False
host = config.get('host')
port = config.get('port', 61208)
url = 'http://{}:{}{}'.format(host, port, _RESOURCE)
try:
response = requests.get(url, timeout=10)
if not response.ok:
_LOGGER.error('Response status is "%s"', response.status_code)
return False
except requests.exceptions.MissingSchema:
_LOGGER.error('Missing resource or schema in configuration. '
'Please heck our details in the configuration file.')
return False
except requests.exceptions.ConnectionError:
_LOGGER.error('No route to resource/endpoint. '
'Please check the details in the configuration file.')
return False
rest = GlancesData(url)
dev = []
for resource in config['resources']:
if resource not in SENSOR_TYPES:
_LOGGER.error('Sensor type: "%s" does not exist', resource)
else:
dev.append(GlancesSensor(rest, resource))
add_devices(dev)
class GlancesSensor(Entity):
""" Implements a REST sensor. """
def __init__(self, rest, sensor_type):
self.rest = rest
self._name = SENSOR_TYPES[sensor_type][0]
self.type = sensor_type
self._state = None
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
self.update()
@property
def name(self):
""" The name of the sensor. """
return self._name
@property
def unit_of_measurement(self):
""" Unit the value is expressed in. """
return self._unit_of_measurement
@property
def state(self):
""" Returns the state of the device. """
return self._state
# pylint: disable=too-many-branches
def update(self):
""" Gets the latest data from REST API and updates the state. """
self.rest.update()
value = self.rest.data
if value is not None:
if self.type == 'disk_use_percent':
self._state = value['fs'][0]['percent']
elif self.type == 'disk_use':
self._state = round(value['fs'][0]['used'] / 1024**3, 1)
elif self.type == 'disk_free':
self._state = round(value['fs'][0]['free'] / 1024**3, 1)
elif self.type == 'memory_use_percent':
self._state = value['mem']['percent']
elif self.type == 'memory_use':
self._state = round(value['mem']['used'] / 1024**2, 1)
elif self.type == 'memory_free':
self._state = round(value['mem']['free'] / 1024**2, 1)
elif self.type == 'swap_use_percent':
self._state = value['memswap']['percent']
elif self.type == 'swap_use':
self._state = round(value['memswap']['used'] / 1024**3, 1)
elif self.type == 'swap_free':
self._state = round(value['memswap']['free'] / 1024**3, 1)
elif self.type == 'processor_load':
self._state = value['load']['min15']
elif self.type == 'process_running':
self._state = value['processcount']['running']
elif self.type == 'process_total':
self._state = value['processcount']['total']
elif self.type == 'process_thread':
self._state = value['processcount']['thread']
elif self.type == 'process_sleeping':
self._state = value['processcount']['sleeping']
# pylint: disable=too-few-public-methods
class GlancesData(object):
""" Class for handling the data retrieval. """
def __init__(self, resource):
self.resource = resource
self.data = dict()
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
""" Gets the latest data from REST service. """
try:
response = requests.get(self.resource, timeout=10)
self.data = response.json()
except requests.exceptions.ConnectionError:
_LOGGER.error("No route to host/endpoint.")
self.data = None

View File

@ -53,7 +53,8 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
try:
for location in [config.get('from', None), config.get('to', None)]:
# transport.opendata.ch doesn't play nice with requests.Session
result = get(_RESOURCE + 'locations?query=%s' % location)
result = get(_RESOURCE + 'locations?query=%s' % location,
timeout=10)
journey.append(result.json()['stations'][0]['name'])
except KeyError:
_LOGGER.exception(
@ -115,8 +116,8 @@ class PublicTransportData(object):
'from=' + self.start + '&' +
'to=' + self.destination + '&' +
'fields[]=connections/from/departureTimestamp/&' +
'fields[]=connections/')
'fields[]=connections/',
timeout=10)
connections = response.json()['connections'][:2]
try:

View File

@ -59,7 +59,6 @@ arg
Additional details for the type, eg. path, binary name, etc.
"""
import logging
import psutil
import homeassistant.util.dt as dt_util
from homeassistant.helpers.entity import Entity
@ -120,7 +119,7 @@ class SystemMonitorSensor(Entity):
@property
def name(self):
return self._name
return self._name.rstrip()
@property
def state(self):
@ -133,6 +132,7 @@ class SystemMonitorSensor(Entity):
# pylint: disable=too-many-branches
def update(self):
import psutil
if self.type == 'disk_use_percent':
self._state = psutil.disk_usage(self.argument).percent
elif self.type == 'disk_use':

View File

@ -1,7 +1,7 @@
"""
homeassistant.components.switch.arduino
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Support for switching Arduino pins on and off. So fare only digital pins are
Support for switching Arduino pins on and off. So far only digital pins are
supported.
Configuration:

View File

@ -0,0 +1,123 @@
"""
homeassistant.components.switch.arest
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The arest switch can control the digital pins of a device running with the
aREST RESTful framework for Arduino, the ESP8266, and the Raspberry Pi.
Only tested with Arduino boards so far.
Configuration:
To use the arest switch you will need to add something like the following
to your configuration.yaml file.
sensor:
platform: arest
resource: http://IP_ADDRESS
pins:
11:
name: Fan Office
12:
name: Light Desk
Variables:
resource:
*Required
IP address of the device that is exposing an aREST API.
pins:
The number of the digital pin to switch.
These are the variables for the pins array:
name
*Required
The name for the pin that will be used in the frontend.
Details for the API: http://arest.io
"""
import logging
from requests import get, exceptions
from homeassistant.components.switch import SwitchDevice
from homeassistant.const import DEVICE_DEFAULT_NAME
_LOGGER = logging.getLogger(__name__)
def setup_platform(hass, config, add_devices, discovery_info=None):
""" Get the aREST switches. """
resource = config.get('resource', None)
try:
response = get(resource, timeout=10)
except exceptions.MissingSchema:
_LOGGER.error("Missing resource or schema in configuration. "
"Add http:// to your URL.")
return False
except exceptions.ConnectionError:
_LOGGER.error("No route to device. "
"Please check the IP address in the configuration file.")
return False
dev = []
pins = config.get('pins')
for pinnum, pin in pins.items():
dev.append(ArestSwitch(resource,
response.json()['name'],
pin.get('name'),
pinnum))
add_devices(dev)
class ArestSwitch(SwitchDevice):
""" Implements an aREST switch. """
def __init__(self, resource, location, name, pin):
self._resource = resource
self._name = '{} {}'.format(location.title(), name.title()) \
or DEVICE_DEFAULT_NAME
self._pin = pin
self._state = None
request = get('{}/mode/{}/o'.format(self._resource, self._pin),
timeout=10)
if request.status_code is not 200:
_LOGGER.error("Can't set mode. Is device offline?")
@property
def name(self):
""" The name of the switch. """
return self._name
@property
def is_on(self):
""" True if device is on. """
return self._state
def turn_on(self, **kwargs):
""" Turn the device on. """
request = get('{}/digital/{}/1'.format(self._resource, self._pin),
timeout=10)
if request.status_code == 200:
self._state = True
else:
_LOGGER.error("Can't turn on pin %s at %s. Is device offline?",
self._resource, self._pin)
def turn_off(self, **kwargs):
""" Turn the device off. """
request = get('{}/digital/{}/0'.format(self._resource, self._pin),
timeout=10)
if request.status_code == 200:
self._state = False
else:
_LOGGER.error("Can't turn off pin %s at %s. Is device offline?",
self._resource, self._pin)
def update(self):
""" Gets the latest data from aREST API and updates the state. """
request = get('{}/digital/{}'.format(self._resource, self._pin),
timeout=10)
self._state = request.json()['return_value'] != 0

View File

@ -2,13 +2,44 @@
"""
homeassistant.components.switch.command_switch
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Allows to configure custom shell commands to turn a switch on/off.
Configuration:
To use the command_line switch you will need to add something like the
following to your configuration.yaml file.
switch:
platform: command_switch
switches:
name_of_the_switch:
oncmd: switch_command on for name_of_the_switch
offcmd: switch_command off for name_of_the_switch
Variables:
These are the variables for the switches array:
name_of_the_switch
*Required
Name of the command switch. Multiple entries are possible.
oncmd
*Required
The action to take for on.
offcmd
*Required
The action to take for off.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/switch.command_switch.html
"""
import logging
from homeassistant.components.switch import SwitchDevice
import subprocess
from homeassistant.components.switch import SwitchDevice
_LOGGER = logging.getLogger(__name__)
@ -22,7 +53,7 @@ def setup_platform(hass, config, add_devices_callback, discovery_info=None):
for dev_name, properties in switches.items():
devices.append(
CommandSwitch(
dev_name,
properties.get('name', dev_name),
properties.get('oncmd', 'true'),
properties.get('offcmd', 'true')))

View File

@ -23,10 +23,17 @@ SCAN_INTERVAL = 60
SERVICE_SET_AWAY_MODE = "set_away_mode"
SERVICE_SET_TEMPERATURE = "set_temperature"
STATE_HEAT = "heat"
STATE_COOL = "cool"
STATE_IDLE = "idle"
ATTR_CURRENT_TEMPERATURE = "current_temperature"
ATTR_AWAY_MODE = "away_mode"
ATTR_MAX_TEMP = "max_temp"
ATTR_MIN_TEMP = "min_temp"
ATTR_TEMPERATURE_LOW = "target_temp_low"
ATTR_TEMPERATURE_HIGH = "target_temp_high"
ATTR_OPERATION = "current_operation"
_LOGGER = logging.getLogger(__name__)
@ -126,19 +133,25 @@ class ThermostatDevice(Entity):
user_unit = self.hass.config.temperature_unit
data = {
ATTR_CURRENT_TEMPERATURE: round(convert(self.current_temperature,
thermostat_unit,
user_unit), 1),
ATTR_MIN_TEMP: round(convert(self.min_temp,
thermostat_unit,
user_unit), 0),
ATTR_MAX_TEMP: round(convert(self.max_temp,
thermostat_unit,
user_unit), 0)
ATTR_CURRENT_TEMPERATURE: round(convert(
self.current_temperature, thermostat_unit, user_unit), 1),
ATTR_MIN_TEMP: round(convert(
self.min_temp, thermostat_unit, user_unit), 0),
ATTR_MAX_TEMP: round(convert(
self.max_temp, thermostat_unit, user_unit), 0),
ATTR_TEMPERATURE: round(convert(
self.target_temperature, thermostat_unit, user_unit), 0),
ATTR_TEMPERATURE_LOW: round(convert(
self.target_temperature_low, thermostat_unit, user_unit), 0),
ATTR_TEMPERATURE_HIGH: round(convert(
self.target_temperature_high, thermostat_unit, user_unit), 0),
}
is_away = self.is_away_mode_on
operation = self.operation
if operation is not None:
data[ATTR_OPERATION] = operation
is_away = self.is_away_mode_on
if is_away is not None:
data[ATTR_AWAY_MODE] = STATE_ON if is_away else STATE_OFF
@ -152,18 +165,33 @@ class ThermostatDevice(Entity):
@property
def unit_of_measurement(self):
""" Unit of measurement this thermostat expresses itself in. """
return NotImplementedError
raise NotImplementedError
@property
def current_temperature(self):
""" Returns the current temperature. """
raise NotImplementedError
@property
def operation(self):
""" Returns current operation ie. heat, cool, idle """
return None
@property
def target_temperature(self):
""" Returns the temperature we try to reach. """
raise NotImplementedError
@property
def target_temperature_low(self):
""" Returns the lower bound temperature we try to reach. """
return self.target_temperature
@property
def target_temperature_high(self):
""" Returns the upper bound temperature we try to reach. """
return self.target_temperature
@property
def is_away_mode_on(self):
"""

View File

@ -3,9 +3,11 @@ homeassistant.components.thermostat.nest
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Adds support for Nest thermostats.
"""
import socket
import logging
from homeassistant.components.thermostat import ThermostatDevice
from homeassistant.components.thermostat import (ThermostatDevice, STATE_COOL,
STATE_IDLE, STATE_HEAT)
from homeassistant.const import (CONF_USERNAME, CONF_PASSWORD, TEMP_CELCIUS)
REQUIREMENTS = ['python-nest==2.6.0']
@ -34,12 +36,16 @@ def setup_platform(hass, config, add_devices, discovery_info=None):
return
napi = nest.Nest(username, password)
add_devices([
NestThermostat(structure, device)
for structure in napi.structures
for device in structure.devices
])
try:
add_devices([
NestThermostat(structure, device)
for structure in napi.structures
for device in structure.devices
])
except socket.error:
logger.error(
"Connection error logging into the nest web service"
)
class NestThermostat(ThermostatDevice):
@ -83,25 +89,52 @@ class NestThermostat(ThermostatDevice):
""" Returns the current temperature. """
return round(self.device.temperature, 1)
@property
def operation(self):
""" Returns current operation ie. heat, cool, idle """
if self.device.hvac_ac_state is True:
return STATE_COOL
elif self.device.hvac_heater_state is True:
return STATE_HEAT
else:
return STATE_IDLE
@property
def target_temperature(self):
""" Returns the temperature we try to reach. """
target = self.device.target
if isinstance(target, tuple):
if self.device.mode == 'range':
low, high = target
if self.current_temperature < low:
temp = low
elif self.current_temperature > high:
if self.operation == STATE_COOL:
temp = high
elif self.operation == STATE_HEAT:
temp = low
else:
temp = (low + high)/2
range_average = (low + high)/2
if self.current_temperature < range_average:
temp = low
elif self.current_temperature >= range_average:
temp = high
else:
temp = target
return round(temp, 1)
@property
def target_temperature_low(self):
""" Returns the lower bound temperature we try to reach. """
if self.device.mode == 'range':
return round(self.device.target[0], 1)
return round(self.target_temperature, 1)
@property
def target_temperature_high(self):
""" Returns the upper bound temperature we try to reach. """
if self.device.mode == 'range':
return round(self.device.target[1], 1)
return round(self.target_temperature, 1)
@property
def is_away_mode_on(self):
""" Returns if away mode is on. """
@ -109,6 +142,11 @@ class NestThermostat(ThermostatDevice):
def set_temperature(self, temperature):
""" Set new target temperature """
if self.device.mode == 'range':
if self.target_temperature == self.target_temperature_low:
temperature = (temperature, self.target_temperature_high)
elif self.target_temperature == self.target_temperature_high:
temperature = (self.target_temperature_low, temperature)
self.device.target = temperature
def turn_away_mode_on(self):

View File

@ -1,6 +1,6 @@
""" Constants used by Home Assistant components. """
__version__ = "0.7.2"
__version__ = "0.7.3dev0"
# Can be used to specify a catch all when registering state or event listeners.
MATCH_ALL = '*'
@ -40,7 +40,7 @@ STATE_ON = 'on'
STATE_OFF = 'off'
STATE_HOME = 'home'
STATE_NOT_HOME = 'not_home'
STATE_UNKNOWN = "unknown"
STATE_UNKNOWN = 'unknown'
STATE_OPEN = 'open'
STATE_CLOSED = 'closed'
STATE_PLAYING = 'playing'

View File

@ -10,8 +10,8 @@ from collections import defaultdict
from homeassistant.exceptions import NoEntitySpecifiedError
from homeassistant.const import (
ATTR_FRIENDLY_NAME, ATTR_UNIT_OF_MEASUREMENT, ATTR_HIDDEN,
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, TEMP_CELCIUS,
ATTR_FRIENDLY_NAME, ATTR_HIDDEN, ATTR_UNIT_OF_MEASUREMENT,
DEVICE_DEFAULT_NAME, STATE_ON, STATE_OFF, STATE_UNKNOWN, TEMP_CELCIUS,
TEMP_FAHRENHEIT)
# Dict mapping entity_id to a boolean that overwrites the hidden property
@ -44,17 +44,17 @@ class Entity(object):
@property
def name(self):
""" Returns the name of the entity. """
return self.get_name()
return DEVICE_DEFAULT_NAME
@property
def state(self):
""" Returns the state of the entity. """
return self.get_state()
return STATE_UNKNOWN
@property
def state_attributes(self):
""" Returns the state attributes. """
return {}
return None
@property
def unit_of_measurement(self):
@ -64,34 +64,12 @@ class Entity(object):
@property
def hidden(self):
""" Suggestion if the entity should be hidden from UIs. """
return self._hidden
@hidden.setter
def hidden(self, val):
""" Sets the suggestion for visibility. """
self._hidden = bool(val)
return False
def update(self):
""" Retrieve latest state. """
pass
# DEPRECATION NOTICE:
# Device is moving from getters to properties.
# For now the new properties will call the old functions
# This will be removed in the future.
def get_name(self):
""" Returns the name of the entity if any. """
return DEVICE_DEFAULT_NAME
def get_state(self):
""" Returns state of the entity. """
return "Unknown"
def get_state_attributes(self):
""" Returns optional state attributes. """
return None
# DO NOT OVERWRITE
# These properties and methods are either managed by Home Assistant or they
# are used to perform a very specific function. Overwriting these may

View File

@ -129,13 +129,13 @@ class EntityComponent(object):
if platform is None:
return
platform_name = '{}.{}'.format(self.domain, platform_type)
try:
platform.setup_platform(
self.hass, platform_config, self.add_entities, discovery_info)
self.hass.config.components.append(platform_name)
except Exception: # pylint: disable=broad-except
self.logger.exception(
'Error while setting up platform %s', platform_type)
return
platform_name = '{}.{}'.format(self.domain, platform_type)
self.hass.config.components.append(platform_name)

View File

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>org.homeassitant</string>
<key>EnvironmentVariables</key>
<dict>
<key>PATH</key>
<string>/usr/local/bin/:/usr/bin:$PATH</string>
</dict>
<key>Program</key>
<string>$HASS_PATH$</string>
<key>AbandonProcessGroup</key>
<false/>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>StandardErrorPath</key>
<string>/Users/$USER$/Library/Logs/homeassitant.log</string>
<key>StandardOutPath</key>
<string>/Users/$USER$/Library/Logs/homeassitant.log</string>
</dict>
</plist>

View File

@ -71,7 +71,7 @@ def ensure_unique_string(preferred_string, current_strings):
""" Returns a string that is not present in current_strings.
If preferred string exists will append _2, _3, .. """
test_string = preferred_string
current_strings = list(current_strings)
current_strings = set(current_strings)
tries = 1
@ -244,22 +244,22 @@ class Throttle(object):
Wrapper that allows wrapped to be called only once per min_time.
If we cannot acquire the lock, it is running so return None.
"""
if lock.acquire(False):
try:
last_call = wrapper.last_call
if not lock.acquire(False):
return None
try:
last_call = wrapper.last_call
# Check if method is never called or no_throttle is given
force = not last_call or kwargs.pop('no_throttle', False)
# Check if method is never called or no_throttle is given
force = not last_call or kwargs.pop('no_throttle', False)
if force or datetime.now() - last_call > self.min_time:
result = method(*args, **kwargs)
wrapper.last_call = datetime.now()
return result
else:
return None
finally:
lock.release()
if force or utcnow() - last_call > self.min_time:
result = method(*args, **kwargs)
wrapper.last_call = utcnow()
return result
else:
return None
finally:
lock.release()
wrapper.last_call = None

View File

@ -25,7 +25,7 @@ pyuserinput==0.1.9
tellcore-py==1.0.4
# Nmap bindings (device_tracker.nmap)
python-nmap==0.4.1
python-nmap==0.4.3
# PushBullet bindings (notify.pushbullet)
pushbullet.py==0.7.1
@ -86,7 +86,7 @@ https://github.com/theolind/pymysensors/archive/35b87d880147a34107da0d40cb815d75
pynetgear==0.3
# Netdisco (discovery)
netdisco==0.3
netdisco==0.4
# Wemo (switch.wemo)
pywemo==0.3
@ -130,3 +130,6 @@ https://github.com/balloob/home-assistant-nzb-clients/archive/616cad591540925992
# sensor.vera
# light.vera
https://github.com/balloob/home-assistant-vera-api/archive/a8f823066ead6c7da6fb5e7abaf16fef62e63364.zip#python-vera==0.1
# Sonos bindings (media_player.sonos)
SoCo==0.11.1

View File

@ -12,7 +12,8 @@ PACKAGES = find_packages(exclude=['tests', 'tests.*'])
PACKAGE_DATA = \
{'homeassistant.components.frontend': ['index.html.template'],
'homeassistant.components.frontend.www_static': ['*.*'],
'homeassistant.components.frontend.www_static.images': ['*.*']}
'homeassistant.components.frontend.www_static.images': ['*.*'],
'homeassistant.startup': ['*.*']}
REQUIRES = [
'requests>=2,<3',

View File

@ -10,11 +10,11 @@ from unittest import mock
from homeassistant import core as ha, loader
import homeassistant.util.location as location_util
import homeassistant.util.dt as dt_util
from homeassistant.helpers.entity import ToggleEntity
from homeassistant.const import (
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED,
EVENT_STATE_CHANGED)
EVENT_STATE_CHANGED, EVENT_PLATFORM_DISCOVERED, ATTR_SERVICE,
ATTR_DISCOVERED)
from homeassistant.components import sun, mqtt
@ -38,8 +38,8 @@ def get_test_home_assistant(num_threads=None):
hass.config.latitude = 32.87336
hass.config.longitude = -117.22743
# if not loader.PREPARED:
loader. prepare(hass)
if 'custom_components.test' not in loader.AVAILABLE_COMPONENTS:
loader.prepare(hass)
return hass
@ -86,10 +86,11 @@ def fire_time_changed(hass, time):
hass.bus.fire(EVENT_TIME_CHANGED, {'now': time})
def trigger_device_tracker_scan(hass):
""" Triggers the device tracker to scan. """
fire_time_changed(
hass, dt_util.utcnow().replace(second=0) + timedelta(hours=1))
def fire_service_discovered(hass, service, info):
hass.bus.fire(EVENT_PLATFORM_DISCOVERED, {
ATTR_SERVICE: service,
ATTR_DISCOVERED: info
})
def ensure_sun_risen(hass):

View File

@ -0,0 +1,269 @@
"""
tests.test_component_demo
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests demo component.
"""
import unittest
import homeassistant.core as ha
import homeassistant.components.automation as automation
from homeassistant.components.automation import event, numeric_state
from homeassistant.const import CONF_PLATFORM
class TestAutomationNumericState(unittest.TestCase):
""" Test the event automation. """
def setUp(self): # pylint: disable=invalid-name
self.hass = ha.HomeAssistant()
self.calls = []
def record_call(service):
self.calls.append(service)
self.hass.services.register('test', 'automation', record_call)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
def test_setup_fails_if_no_entity_id(self):
self.assertFalse(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
def test_setup_fails_if_no_condition(self):
self.assertFalse(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
automation.CONF_SERVICE: 'test.automation'
}
}))
def test_if_fires_on_entity_change_below(self):
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 9 is below 10
self.hass.states.set('test.entity', 9)
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_fires_on_entity_change_over_to_below(self):
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 9 is below 10
self.hass.states.set('test.entity', 9)
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_not_fires_on_entity_change_below_to_below(self):
self.hass.states.set('test.entity', 9)
self.hass.pool.block_till_done()
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 9 is below 10 so this should not fire again
self.hass.states.set('test.entity', 8)
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_fires_on_entity_change_above(self):
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 11 is above 10
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_fires_on_entity_change_below_to_above(self):
# set initial state
self.hass.states.set('test.entity', 9)
self.hass.pool.block_till_done()
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 11 is above 10 and 9 is below
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_not_fires_on_entity_change_above_to_above(self):
# set initial state
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 11 is above 10 so this should fire again
self.hass.states.set('test.entity', 12)
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_fires_on_entity_change_below_range(self):
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 5,
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 9 is below 10
self.hass.states.set('test.entity', 9)
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_fires_on_entity_change_below_above_range(self):
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 5,
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 4 is below 5
self.hass.states.set('test.entity', 4)
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_fires_on_entity_change_over_to_below_range(self):
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 5,
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 9 is below 10
self.hass.states.set('test.entity', 9)
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_fires_on_entity_change_over_to_below_above_range(self):
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.entity',
numeric_state.CONF_ABOVE: 5,
numeric_state.CONF_BELOW: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
# 4 is below 5 so it should not fire
self.hass.states.set('test.entity', 4)
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_not_fires_if_entity_not_match(self):
self.assertTrue(automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: 'test.another_entity',
numeric_state.CONF_ABOVE: 10,
automation.CONF_SERVICE: 'test.automation'
}
}))
self.hass.states.set('test.entity', 11)
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_action(self):
entity_id = 'domain.test_entity'
test_state = 10
automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'event',
event.CONF_EVENT_TYPE: 'test_event',
automation.CONF_SERVICE: 'test.automation',
automation.CONF_IF: [{
CONF_PLATFORM: 'numeric_state',
numeric_state.CONF_ENTITY_ID: entity_id,
numeric_state.CONF_ABOVE: test_state,
numeric_state.CONF_BELOW: test_state + 2,
}]
}
})
self.hass.states.set(entity_id, test_state )
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
self.hass.states.set(entity_id, test_state - 1)
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
self.hass.states.set(entity_id, test_state + 1)
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(2, len(self.calls))

View File

@ -8,7 +8,7 @@ import unittest
import homeassistant.core as ha
import homeassistant.components.automation as automation
import homeassistant.components.automation.state as state
from homeassistant.components.automation import event, state
from homeassistant.const import CONF_PLATFORM
@ -137,3 +137,31 @@ class TestAutomationState(unittest.TestCase):
self.hass.states.set('test.entity', 'world')
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
def test_if_action(self):
entity_id = 'domain.test_entity'
test_state = 'new_state'
automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'event',
event.CONF_EVENT_TYPE: 'test_event',
automation.CONF_SERVICE: 'test.automation',
automation.CONF_IF: [{
CONF_PLATFORM: 'state',
state.CONF_ENTITY_ID: entity_id,
state.CONF_STATE: test_state,
}]
}
})
self.hass.states.set(entity_id, test_state)
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
self.hass.states.set(entity_id, test_state + 'something')
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))

View File

@ -4,13 +4,14 @@ tests.test_component_demo
Tests demo component.
"""
from datetime import timedelta
import unittest
from unittest.mock import patch
import homeassistant.core as ha
import homeassistant.loader as loader
import homeassistant.util.dt as dt_util
import homeassistant.components.automation as automation
import homeassistant.components.automation.time as time
from homeassistant.components.automation import time, event
from homeassistant.const import CONF_PLATFORM
from tests.common import fire_time_changed
@ -94,3 +95,133 @@ class TestAutomationTime(unittest.TestCase):
self.hass.states.set('test.entity', 'world')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_action_before(self):
automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'event',
event.CONF_EVENT_TYPE: 'test_event',
automation.CONF_SERVICE: 'test.automation',
automation.CONF_IF: {
CONF_PLATFORM: 'time',
time.CONF_BEFORE: '10:00'
}
}
})
before_10 = dt_util.now().replace(hour=8)
after_10 = dt_util.now().replace(hour=14)
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=before_10):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=after_10):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_action_after(self):
automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'event',
event.CONF_EVENT_TYPE: 'test_event',
automation.CONF_SERVICE: 'test.automation',
automation.CONF_IF: {
CONF_PLATFORM: 'time',
time.CONF_AFTER: '10:00'
}
}
})
before_10 = dt_util.now().replace(hour=8)
after_10 = dt_util.now().replace(hour=14)
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=before_10):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(0, len(self.calls))
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=after_10):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_action_one_weekday(self):
automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'event',
event.CONF_EVENT_TYPE: 'test_event',
automation.CONF_SERVICE: 'test.automation',
automation.CONF_IF: {
CONF_PLATFORM: 'time',
time.CONF_WEEKDAY: 'mon',
}
}
})
days_past_monday = dt_util.now().weekday()
monday = dt_util.now() - timedelta(days=days_past_monday)
tuesday = monday + timedelta(days=1)
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=monday):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=tuesday):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
def test_if_action_list_weekday(self):
automation.setup(self.hass, {
automation.DOMAIN: {
CONF_PLATFORM: 'event',
event.CONF_EVENT_TYPE: 'test_event',
automation.CONF_SERVICE: 'test.automation',
automation.CONF_IF: {
CONF_PLATFORM: 'time',
time.CONF_WEEKDAY: ['mon', 'tue'],
}
}
})
days_past_monday = dt_util.now().weekday()
monday = dt_util.now() - timedelta(days=days_past_monday)
tuesday = monday + timedelta(days=1)
wednesday = tuesday + timedelta(days=1)
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=monday):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(1, len(self.calls))
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=tuesday):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(2, len(self.calls))
with patch('homeassistant.components.automation.time.dt_util.now',
return_value=wednesday):
self.hass.bus.fire('test_event')
self.hass.pool.block_till_done()
self.assertEqual(2, len(self.calls))

View File

@ -0,0 +1,235 @@
"""
tests.test_component_device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the device tracker compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
from unittest.mock import patch
from datetime import timedelta
import os
from homeassistant.config import load_yaml_config_file
from homeassistant.loader import get_component
import homeassistant.util.dt as dt_util
from homeassistant.const import (
ATTR_ENTITY_ID, ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME, ATTR_HIDDEN,
STATE_HOME, STATE_NOT_HOME, CONF_PLATFORM, DEVICE_DEFAULT_NAME)
import homeassistant.components.device_tracker as device_tracker
from tests.common import (
get_test_home_assistant, fire_time_changed, fire_service_discovered)
class TestComponentsDeviceTracker(unittest.TestCase):
""" Tests homeassistant.components.device_tracker module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
self.yaml_devices = self.hass.config.path(device_tracker.YAML_DEVICES)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
try:
os.remove(self.yaml_devices)
except FileNotFoundError:
pass
self.hass.stop()
def test_is_on(self):
""" Test is_on method. """
entity_id = device_tracker.ENTITY_ID_FORMAT.format('test')
self.hass.states.set(entity_id, STATE_HOME)
self.assertTrue(device_tracker.is_on(self.hass, entity_id))
self.hass.states.set(entity_id, STATE_NOT_HOME)
self.assertFalse(device_tracker.is_on(self.hass, entity_id))
def test_migrating_config(self):
csv_devices = self.hass.config.path(device_tracker.CSV_DEVICES)
self.assertFalse(os.path.isfile(csv_devices))
self.assertFalse(os.path.isfile(self.yaml_devices))
person1 = {
'mac': 'AB:CD:EF:GH:IJ:KL',
'name': 'Paulus',
'track': True,
'picture': 'http://placehold.it/200x200',
}
person2 = {
'mac': 'MN:OP:QR:ST:UV:WX:YZ',
'name': '',
'track': False,
'picture': None,
}
try:
with open(csv_devices, 'w') as fil:
fil.write('device,name,track,picture\n')
for pers in (person1, person2):
fil.write('{},{},{},{}\n'.format(
pers['mac'], pers['name'],
'1' if pers['track'] else '0', pers['picture'] or ''))
self.assertTrue(device_tracker.setup(self.hass, {}))
self.assertFalse(os.path.isfile(csv_devices))
self.assertTrue(os.path.isfile(self.yaml_devices))
yaml_config = load_yaml_config_file(self.yaml_devices)
self.assertEqual(2, len(yaml_config))
for pers, yaml_pers in zip(
(person1, person2), sorted(yaml_config.values(),
key=lambda pers: pers['mac'])):
for key, value in pers.items():
if key == 'name' and value == '':
value = DEVICE_DEFAULT_NAME
self.assertEqual(value, yaml_pers.get(key))
finally:
try:
os.remove(csv_devices)
except FileNotFoundError:
pass
def test_reading_yaml_config(self):
dev_id = 'test'
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, 'AB:CD:EF:GH:IJ',
'Test name', 'http://test.picture', True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
self.assertTrue(device_tracker.setup(self.hass, {}))
config = device_tracker.load_config(self.yaml_devices, self.hass,
device.consider_home)[0]
self.assertEqual(device.dev_id, config.dev_id)
self.assertEqual(device.track, config.track)
self.assertEqual(device.mac, config.mac)
self.assertEqual(device.config_picture, config.config_picture)
self.assertEqual(device.away_hide, config.away_hide)
self.assertEqual(device.consider_home, config.consider_home)
def test_setup_without_yaml_file(self):
self.assertTrue(device_tracker.setup(self.hass, {}))
def test_adding_unknown_device_to_config(self):
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
config = device_tracker.load_config(self.yaml_devices, self.hass,
timedelta(seconds=0))[0]
self.assertEqual('DEV1', config.dev_id)
self.assertEqual(True, config.track)
def test_discovery(self):
scanner = get_component('device_tracker.test').SCANNER
with patch.dict(device_tracker.DISCOVERY_PLATFORMS, {'test': 'test'}):
with patch.object(scanner, 'scan_devices') as mock_scan:
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
fire_service_discovered(self.hass, 'test', {})
self.assertTrue(mock_scan.called)
def test_update_stale(self):
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
self.assertEqual(STATE_HOME,
self.hass.states.get('device_tracker.dev1').state)
scanner.leave_home('DEV1')
now = dt_util.utcnow().replace(second=0) + timedelta(hours=1)
with patch('homeassistant.util.dt.utcnow', return_value=now):
fire_time_changed(self.hass, now)
self.hass.pool.block_till_done()
self.assertEqual(STATE_NOT_HOME,
self.hass.states.get('device_tracker.dev1').state)
def test_entity_attributes(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
friendly_name = 'Paulus'
picture = 'http://placehold.it/200x200'
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
friendly_name, picture, away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
self.assertTrue(device_tracker.setup(self.hass, {}))
attrs = self.hass.states.get(entity_id).attributes
self.assertEqual(friendly_name, attrs.get(ATTR_FRIENDLY_NAME))
self.assertEqual(picture, attrs.get(ATTR_ENTITY_PICTURE))
def test_device_hidden(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
self.assertTrue(self.hass.states.get(entity_id)
.attributes.get(ATTR_HIDDEN))
def test_group_all_devices(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
state = self.hass.states.get(device_tracker.ENTITY_ID_ALL_DEVICES)
self.assertIsNotNone(state)
self.assertEqual(STATE_NOT_HOME, state.state)
self.assertSequenceEqual((entity_id,),
state.attributes.get(ATTR_ENTITY_ID))
@patch('homeassistant.components.device_tracker.DeviceTracker.see')
def test_see_service(self, mock_see):
self.assertTrue(device_tracker.setup(self.hass, {}))
mac = 'AB:CD:EF:GH'
dev_id = 'some_device'
host_name = 'example.com'
location_name = 'Work'
gps = [.3, .8]
device_tracker.see(self.hass, mac, dev_id, host_name, location_name,
gps)
self.hass.pool.block_till_done()
mock_see.assert_called_once_with(
mac=mac, dev_id=dev_id, host_name=host_name,
location_name=location_name, gps=gps)

View File

@ -0,0 +1,37 @@
import unittest
import os
from homeassistant.components import device_tracker
from homeassistant.const import CONF_PLATFORM
from tests.common import (
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message)
class TestComponentsDeviceTrackerMQTT(unittest.TestCase):
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
mock_mqtt_component(self.hass)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError:
pass
def test_new_message(self):
dev_id = 'paulus'
enttiy_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
topic = '/location/paulus'
location = 'work'
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'mqtt',
'devices': {dev_id: topic}
}}))
fire_mqtt_message(self.hass, topic, location)
self.hass.pool.block_till_done()
self.assertEqual(location, self.hass.states.get(enttiy_id).state)

View File

@ -4,14 +4,18 @@ tests.test_component_demo
Tests demo component.
"""
import json
import unittest
from unittest.mock import patch
import homeassistant.core as ha
import homeassistant.components.demo as demo
from homeassistant.remote import JSONEncoder
from tests.common import mock_http_component
@patch('homeassistant.components.sun.setup')
class TestDemo(unittest.TestCase):
""" Test the demo module. """
@ -23,14 +27,24 @@ class TestDemo(unittest.TestCase):
""" Stop down stuff we started. """
self.hass.stop()
def test_if_demo_state_shows_by_default(self):
def test_if_demo_state_shows_by_default(self, mock_sun_setup):
""" Test if demo state shows if we give no configuration. """
demo.setup(self.hass, {demo.DOMAIN: {}})
self.assertIsNotNone(self.hass.states.get('a.Demo_Mode'))
def test_hiding_demo_state(self):
def test_hiding_demo_state(self, mock_sun_setup):
""" Test if you can hide the demo card. """
demo.setup(self.hass, {demo.DOMAIN: {'hide_demo_state': 1}})
self.assertIsNone(self.hass.states.get('a.Demo_Mode'))
def test_all_entities_can_be_loaded_over_json(self, mock_sun_setup):
""" Test if you can hide the demo card. """
demo.setup(self.hass, {demo.DOMAIN: {'hide_demo_state': 1}})
try:
json.dumps(self.hass.states.all(), cls=JSONEncoder)
except Exception:
self.fail('Unable to convert all demo entities to JSON. '
'Wrong data in state machine!')

View File

@ -9,14 +9,14 @@ import os
import unittest
import homeassistant.loader as loader
from homeassistant.const import CONF_PLATFORM
from homeassistant.const import CONF_PLATFORM, STATE_HOME, STATE_NOT_HOME
from homeassistant.components import (
device_tracker, light, sun, device_sun_light_trigger)
from tests.common import (
get_test_config_dir, get_test_home_assistant, ensure_sun_risen,
ensure_sun_set, trigger_device_tracker_scan)
ensure_sun_set)
KNOWN_DEV_PATH = None
@ -27,7 +27,7 @@ def setUpModule(): # pylint: disable=invalid-name
global KNOWN_DEV_PATH
KNOWN_DEV_PATH = os.path.join(get_test_config_dir(),
device_tracker.KNOWN_DEVICES_FILE)
device_tracker.CSV_DEVICES)
with open(KNOWN_DEV_PATH, 'w') as fil:
fil.write('device,name,track,picture\n')
@ -37,7 +37,8 @@ def setUpModule(): # pylint: disable=invalid-name
def tearDownModule(): # pylint: disable=invalid-name
""" Stops the Home Assistant server. """
os.remove(KNOWN_DEV_PATH)
os.remove(os.path.join(get_test_config_dir(),
device_tracker.YAML_DEVICES))
class TestDeviceSunLightTrigger(unittest.TestCase):
@ -54,15 +55,16 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
loader.get_component('light.test').init()
device_tracker.setup(self.hass, {
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
})
}))
light.setup(self.hass, {
self.assertTrue(light.setup(self.hass, {
light.DOMAIN: {CONF_PLATFORM: 'test'}
})
}))
sun.setup(self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}})
self.assertTrue(sun.setup(
self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}}))
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
@ -71,8 +73,8 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
def test_lights_on_when_sun_sets(self):
""" Test lights go on when there is someone home and the sun sets. """
device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}})
self.assertTrue(device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}}))
ensure_sun_risen(self.hass)
@ -92,12 +94,11 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
self.hass.pool.block_till_done()
device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}})
self.assertTrue(device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}}))
self.scanner.leave_home('DEV1')
trigger_device_tracker_scan(self.hass)
self.hass.states.set(device_tracker.ENTITY_ID_ALL_DEVICES,
STATE_NOT_HOME)
self.hass.pool.block_till_done()
@ -111,11 +112,11 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
self.hass.pool.block_till_done()
device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}})
self.assertTrue(device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}}))
self.scanner.come_home('DEV2')
trigger_device_tracker_scan(self.hass)
self.hass.states.set(
device_tracker.ENTITY_ID_FORMAT.format('device_2'), STATE_HOME)
self.hass.pool.block_till_done()

View File

@ -1,193 +0,0 @@
"""
tests.test_component_device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the device tracker compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
from datetime import timedelta
import os
import homeassistant.core as ha
import homeassistant.loader as loader
import homeassistant.util.dt as dt_util
from homeassistant.const import (
STATE_HOME, STATE_NOT_HOME, ATTR_ENTITY_PICTURE, CONF_PLATFORM,
DEVICE_DEFAULT_NAME)
import homeassistant.components.device_tracker as device_tracker
from tests.common import get_test_home_assistant
class TestComponentsDeviceTracker(unittest.TestCase):
""" Tests homeassistant.components.device_tracker module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
self.known_dev_path = self.hass.config.path(
device_tracker.KNOWN_DEVICES_FILE)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
if os.path.isfile(self.known_dev_path):
os.remove(self.known_dev_path)
def test_is_on(self):
""" Test is_on method. """
entity_id = device_tracker.ENTITY_ID_FORMAT.format('test')
self.hass.states.set(entity_id, STATE_HOME)
self.assertTrue(device_tracker.is_on(self.hass, entity_id))
self.hass.states.set(entity_id, STATE_NOT_HOME)
self.assertFalse(device_tracker.is_on(self.hass, entity_id))
def test_setup(self):
""" Test setup method. """
# Bogus config
self.assertFalse(device_tracker.setup(self.hass, {}))
self.assertFalse(
device_tracker.setup(self.hass, {device_tracker.DOMAIN: {}}))
# Test with non-existing component
self.assertFalse(device_tracker.setup(
self.hass, {device_tracker.DOMAIN: {CONF_PLATFORM: 'nonexisting'}}
))
# Test with a bad known device file around
with open(self.known_dev_path, 'w') as fil:
fil.write("bad data\nbad data\n")
self.assertFalse(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}))
def test_writing_known_devices_file(self):
""" Test the device tracker class. """
scanner = loader.get_component(
'device_tracker.test').get_scanner(None, None)
scanner.reset()
scanner.come_home('DEV1')
scanner.come_home('DEV2')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}))
# Ensure a new known devices file has been created.
# Since the device_tracker uses a set internally we cannot
# know what the order of the devices in the known devices file is.
# To ensure all the three expected lines are there, we sort the file
with open(self.known_dev_path) as fil:
self.assertEqual(
['DEV1,{},0,\n'.format(DEVICE_DEFAULT_NAME), 'DEV2,dev2,0,\n',
'device,name,track,picture\n'],
sorted(fil))
# Write one where we track dev1, dev2
with open(self.known_dev_path, 'w') as fil:
fil.write('device,name,track,picture\n')
fil.write('DEV1,device 1,1,http://example.com/dev1.jpg\n')
fil.write('DEV2,device 2,1,http://example.com/dev2.jpg\n')
scanner.leave_home('DEV1')
scanner.come_home('DEV3')
self.hass.services.call(
device_tracker.DOMAIN,
device_tracker.SERVICE_DEVICE_TRACKER_RELOAD)
self.hass.pool.block_till_done()
dev1 = device_tracker.ENTITY_ID_FORMAT.format('device_1')
dev2 = device_tracker.ENTITY_ID_FORMAT.format('device_2')
dev3 = device_tracker.ENTITY_ID_FORMAT.format('DEV3')
now = dt_util.utcnow()
# Device scanner scans every 12 seconds. We need to sync our times to
# be every 12 seconds or else the time_changed event will be ignored.
nowAlmostMinimumGone = now + device_tracker.TIME_DEVICE_NOT_FOUND
nowAlmostMinimumGone -= timedelta(
seconds=12+(nowAlmostMinimumGone.second % 12))
nowMinimumGone = now + device_tracker.TIME_DEVICE_NOT_FOUND
nowMinimumGone += timedelta(seconds=12-(nowMinimumGone.second % 12))
# Test initial is correct
self.assertTrue(device_tracker.is_on(self.hass))
self.assertFalse(device_tracker.is_on(self.hass, dev1))
self.assertTrue(device_tracker.is_on(self.hass, dev2))
self.assertIsNone(self.hass.states.get(dev3))
self.assertEqual(
'http://example.com/dev1.jpg',
self.hass.states.get(dev1).attributes.get(ATTR_ENTITY_PICTURE))
self.assertEqual(
'http://example.com/dev2.jpg',
self.hass.states.get(dev2).attributes.get(ATTR_ENTITY_PICTURE))
# Test if dev3 got added to known dev file
with open(self.known_dev_path) as fil:
self.assertEqual('DEV3,dev3,0,\n', list(fil)[-1])
# Change dev3 to track
with open(self.known_dev_path, 'w') as fil:
fil.write("device,name,track,picture\n")
fil.write('DEV1,Device 1,1,http://example.com/picture.jpg\n')
fil.write('DEV2,Device 2,1,http://example.com/picture.jpg\n')
fil.write('DEV3,DEV3,1,\n')
scanner.come_home('DEV1')
scanner.leave_home('DEV2')
# reload dev file
self.hass.services.call(
device_tracker.DOMAIN,
device_tracker.SERVICE_DEVICE_TRACKER_RELOAD)
self.hass.pool.block_till_done()
# Test what happens if a device comes home and another leaves
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
# Dev2 will still be home because of the error margin on time
self.assertTrue(device_tracker.is_on(self.hass, dev2))
# dev3 should be tracked now after we reload the known devices
self.assertTrue(device_tracker.is_on(self.hass, dev3))
self.assertIsNone(
self.hass.states.get(dev3).attributes.get(ATTR_ENTITY_PICTURE))
# Test if device leaves what happens, test the time span
self.hass.bus.fire(
ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: nowAlmostMinimumGone})
self.hass.pool.block_till_done()
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
# Dev2 will still be home because of the error time
self.assertTrue(device_tracker.is_on(self.hass, dev2))
self.assertTrue(device_tracker.is_on(self.hass, dev3))
# Now test if gone for longer then error margin
self.hass.bus.fire(
ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: nowMinimumGone})
self.hass.pool.block_till_done()
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
self.assertFalse(device_tracker.is_on(self.hass, dev2))
self.assertTrue(device_tracker.is_on(self.hass, dev3))

View File

@ -8,6 +8,8 @@ Tests the history component.
import time
import os
import unittest
from unittest.mock import patch
from datetime import timedelta
import homeassistant.core as ha
import homeassistant.util.dt as dt_util
@ -68,11 +70,7 @@ class TestComponentHistory(unittest.TestCase):
self.init_recorder()
states = []
# Create 10 states for 5 different entities
# After the first 5, sleep a second and save the time
# history.get_states takes the latest states BEFORE point X
for i in range(10):
for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
@ -80,19 +78,27 @@ class TestComponentHistory(unittest.TestCase):
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
if i < 5:
states.append(state)
states.append(state)
if i == 4:
time.sleep(1)
point = dt_util.utcnow()
recorder._INSTANCE.block_till_done()
self.assertEqual(
states,
sorted(
history.get_states(point), key=lambda state: state.entity_id))
point = dt_util.utcnow() + timedelta(seconds=1)
with patch('homeassistant.util.dt.utcnow', return_value=point):
for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
# Get states returns everything before POINT
self.assertEqual(states,
sorted(history.get_states(point),
key=lambda state: state.entity_id))
# Test get_state here because we have a DB setup
self.assertEqual(
@ -113,22 +119,20 @@ class TestComponentHistory(unittest.TestCase):
set_state('YouTube')
start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
time.sleep(1)
with patch('homeassistant.util.dt.utcnow', return_value=point):
states = [
set_state('idle'),
set_state('Netflix'),
set_state('Plex'),
set_state('YouTube'),
]
states = [
set_state('idle'),
set_state('Netflix'),
set_state('Plex'),
set_state('YouTube'),
]
time.sleep(1)
end = dt_util.utcnow()
set_state('Netflix')
set_state('Plex')
with patch('homeassistant.util.dt.utcnow', return_value=end):
set_state('Netflix')
set_state('Plex')
self.assertEqual(
{entity_id: states},

View File

@ -63,6 +63,25 @@ class TestComponentHistory(unittest.TestCase):
entries[0], name='Home Assistant', message='restarted',
domain=ha.DOMAIN)
def test_process_custom_logbook_entries(self):
""" Tests if custom log book entries get added as an entry. """
name = 'Nice name'
message = 'has a custom entry'
entity_id = 'sun.sun'
entries = list(logbook.humanify((
ha.Event(logbook.EVENT_LOGBOOK_ENTRY, {
logbook.ATTR_NAME: name,
logbook.ATTR_MESSAGE: message,
logbook.ATTR_ENTITY_ID: entity_id,
}),
)))
self.assertEqual(1, len(entries))
self.assert_entry(
entries[0], name=name, message=message,
domain='sun', entity_id=entity_id)
def assert_entry(self, entry, when=None, name=None, message=None,
domain=None, entity_id=None):
""" Asserts an entry is what is expected """

View File

@ -7,9 +7,9 @@ Tests switch component.
# pylint: disable=too-many-public-methods,protected-access
import unittest
import homeassistant.loader as loader
from homeassistant import loader
from homeassistant.components import switch
from homeassistant.const import STATE_ON, STATE_OFF, CONF_PLATFORM
import homeassistant.components.switch as switch
from tests.common import get_test_home_assistant

View File

@ -34,14 +34,6 @@ class TestHelpersEntity(unittest.TestCase):
ATTR_HIDDEN,
self.hass.states.get(self.entity.entity_id).attributes)
def test_setting_hidden_to_true(self):
self.entity.hidden = True
self.entity.update_ha_state()
state = self.hass.states.get(self.entity.entity_id)
self.assertTrue(state.attributes.get(ATTR_HIDDEN))
def test_overwriting_hidden_property_to_true(self):
""" Test we can overwrite hidden property to True. """
entity.Entity.overwrite_attribute(self.entity.entity_id,
@ -50,14 +42,3 @@ class TestHelpersEntity(unittest.TestCase):
state = self.hass.states.get(self.entity.entity_id)
self.assertTrue(state.attributes.get(ATTR_HIDDEN))
def test_overwriting_hidden_property_to_false(self):
""" Test we can overwrite hidden property to True. """
entity.Entity.overwrite_attribute(self.entity.entity_id,
[ATTR_HIDDEN], [False])
self.entity.hidden = True
self.entity.update_ha_state()
self.assertNotIn(
ATTR_HIDDEN,
self.hass.states.get(self.entity.entity_id).attributes)

View File

@ -7,13 +7,13 @@ Tests component helpers.
# pylint: disable=protected-access,too-many-public-methods
import unittest
from common import get_test_home_assistant
import homeassistant.core as ha
import homeassistant.loader as loader
from homeassistant.const import STATE_ON, STATE_OFF, ATTR_ENTITY_ID
from homeassistant.helpers import extract_entity_ids
from tests.common import get_test_home_assistant
class TestComponentsCore(unittest.TestCase):
""" Tests homeassistant.components module. """

View File

@ -15,7 +15,7 @@ from homeassistant.const import (
CONF_LATITUDE, CONF_LONGITUDE, CONF_TEMPERATURE_UNIT, CONF_NAME,
CONF_TIME_ZONE)
from common import get_test_config_dir, mock_detect_location_info
from tests.common import get_test_config_dir, mock_detect_location_info
CONFIG_DIR = get_test_config_dir()
YAML_PATH = os.path.join(CONFIG_DIR, config_util.YAML_CONFIG_FILE)

View File

@ -8,10 +8,10 @@ Provides tests to verify that Home Assistant core works.
# pylint: disable=too-few-public-methods
import os
import unittest
import unittest.mock as mock
from unittest.mock import patch
import time
import threading
from datetime import datetime
from datetime import datetime, timedelta
import pytz
@ -55,29 +55,26 @@ class TestHomeAssistant(unittest.TestCase):
self.hass.pool.block_till_done()
self.assertEqual(1, len(calls))
# @patch('homeassistant.core.time.sleep')
def test_block_till_stoped(self):
""" Test if we can block till stop service is called. """
blocking_thread = threading.Thread(target=self.hass.block_till_stopped)
with patch('time.sleep'):
blocking_thread = threading.Thread(
target=self.hass.block_till_stopped)
self.assertFalse(blocking_thread.is_alive())
self.assertFalse(blocking_thread.is_alive())
blocking_thread.start()
blocking_thread.start()
# Threads are unpredictable, try 20 times if we're ready
wait_loops = 0
while not blocking_thread.is_alive() and wait_loops < 20:
wait_loops += 1
time.sleep(0.05)
self.assertTrue(blocking_thread.is_alive())
self.assertTrue(blocking_thread.is_alive())
self.hass.services.call(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP)
self.hass.pool.block_till_done()
self.hass.services.call(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP)
self.hass.pool.block_till_done()
# Threads are unpredictable, try 20 times if we're ready
wait_loops = 0
while blocking_thread.is_alive() and wait_loops < 20:
wait_loops += 1
# Wait for thread to stop
for _ in range(20):
if not blocking_thread.is_alive():
break
time.sleep(0.05)
self.assertFalse(blocking_thread.is_alive())
@ -88,13 +85,9 @@ class TestHomeAssistant(unittest.TestCase):
lambda event: calls.append(1))
def raise_keyboardinterrupt(length):
# We don't want to patch the sleep of the timer.
if length == 1:
raise KeyboardInterrupt
raise KeyboardInterrupt
self.hass.start()
with mock.patch('time.sleep', raise_keyboardinterrupt):
with patch('homeassistant.core.time.sleep', raise_keyboardinterrupt):
self.hass.block_till_stopped()
self.assertEqual(1, len(calls))
@ -400,9 +393,10 @@ class TestStateMachine(unittest.TestCase):
def test_last_changed_not_updated_on_same_state(self):
state = self.states.get('light.Bowl')
time.sleep(1)
future = dt_util.utcnow() + timedelta(hours=10)
self.states.set("light.Bowl", "on")
with patch('homeassistant.util.dt.utcnow', return_value=future):
self.states.set("light.Bowl", "on", {'attr': 'triggers_change'})
self.assertEqual(state.last_changed,
self.states.get('light.Bowl').last_changed)

View File

@ -10,7 +10,7 @@ import unittest
import homeassistant.loader as loader
import homeassistant.components.http as http
from common import get_test_home_assistant, MockModule
from tests.common import get_test_home_assistant, MockModule
class TestLoader(unittest.TestCase):
@ -24,9 +24,9 @@ class TestLoader(unittest.TestCase):
def test_set_component(self):
""" Test if set_component works. """
loader.set_component('switch.test', http)
loader.set_component('switch.test_set', http)
self.assertEqual(http, loader.get_component('switch.test'))
self.assertEqual(http, loader.get_component('switch.test_set'))
def test_get_component(self):
""" Test if get_component works. """

View File

@ -6,10 +6,11 @@ Tests Home Assistant util methods.
"""
# pylint: disable=too-many-public-methods
import unittest
import time
from unittest.mock import patch
from datetime import datetime, timedelta
import homeassistant.util as util
from homeassistant import util
import homeassistant.util.dt as dt_util
class TestUtil(unittest.TestCase):
@ -169,21 +170,19 @@ class TestUtil(unittest.TestCase):
def test_throttle(self):
""" Test the add cooldown decorator. """
calls1 = []
calls2 = []
@util.Throttle(timedelta(milliseconds=500))
@util.Throttle(timedelta(seconds=4))
def test_throttle1():
calls1.append(1)
calls2 = []
@util.Throttle(
timedelta(milliseconds=500), timedelta(milliseconds=250))
@util.Throttle(timedelta(seconds=4), timedelta(seconds=2))
def test_throttle2():
calls2.append(1)
# Ensure init is ok
self.assertEqual(0, len(calls1))
self.assertEqual(0, len(calls2))
now = dt_util.utcnow()
plus3 = now + timedelta(seconds=3)
plus5 = plus3 + timedelta(seconds=2)
# Call first time and ensure methods got called
test_throttle1()
@ -206,25 +205,16 @@ class TestUtil(unittest.TestCase):
self.assertEqual(2, len(calls1))
self.assertEqual(1, len(calls2))
# Sleep past the no throttle interval for throttle2
time.sleep(.3)
test_throttle1()
test_throttle2()
with patch('homeassistant.util.utcnow', return_value=plus3):
test_throttle1()
test_throttle2()
self.assertEqual(2, len(calls1))
self.assertEqual(1, len(calls2))
test_throttle1(no_throttle=True)
test_throttle2(no_throttle=True)
with patch('homeassistant.util.utcnow', return_value=plus5):
test_throttle1()
test_throttle2()
self.assertEqual(3, len(calls1))
self.assertEqual(2, len(calls2))
time.sleep(.5)
test_throttle1()
test_throttle2()
self.assertEqual(4, len(calls1))
self.assertEqual(3, len(calls2))