Merge pull request #345 from balloob/device-tracker

Device tracker rewrite
This commit is contained in:
Paulus Schoutsen 2015-09-13 00:15:30 -07:00
commit 134c870d2b
28 changed files with 794 additions and 676 deletions

View File

@ -123,6 +123,7 @@ def prepare_setup_platform(hass, config, domain, platform_name):
# Not found # Not found
if platform is None: if platform is None:
_LOGGER.error('Unable to find platform %s', platform_path)
return None return None
# Already loaded # Already loaded

View File

@ -1,52 +1,82 @@
""" """
homeassistant.components.tracker homeassistant.components.device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Provides functionality to keep track of devices. Provides functionality to keep track of devices.
device_tracker:
platform: netgear
# Optional
# How many seconds to wait after not seeing device to consider it not home
consider_home: 180
# Seconds between each scan
interval_seconds: 12
# New found devices auto found
track_new_devices: yes
""" """
import logging
import threading
import os
import csv import csv
from datetime import timedelta from datetime import timedelta
import logging
import os
import threading
from homeassistant.helpers import validate_config from homeassistant.bootstrap import prepare_setup_platform
from homeassistant.helpers.entity import _OVERWRITE from homeassistant.components import discovery, group
from homeassistant.config import load_yaml_config_file
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_per_platform
from homeassistant.helpers.entity import Entity
import homeassistant.util as util import homeassistant.util as util
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.bootstrap import prepare_setup_platform
from homeassistant.helpers.event import track_utc_time_change from homeassistant.helpers.event import track_utc_time_change
from homeassistant.const import ( from homeassistant.const import (
STATE_HOME, STATE_NOT_HOME, ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME, ATTR_ENTITY_PICTURE, DEVICE_DEFAULT_NAME, STATE_HOME, STATE_NOT_HOME)
CONF_PLATFORM, DEVICE_DEFAULT_NAME)
from homeassistant.components import group
DOMAIN = "device_tracker" DOMAIN = "device_tracker"
DEPENDENCIES = [] DEPENDENCIES = []
SERVICE_DEVICE_TRACKER_RELOAD = "reload_devices_csv"
GROUP_NAME_ALL_DEVICES = 'all devices' GROUP_NAME_ALL_DEVICES = 'all devices'
ENTITY_ID_ALL_DEVICES = group.ENTITY_ID_FORMAT.format('all_devices') ENTITY_ID_ALL_DEVICES = group.ENTITY_ID_FORMAT.format('all_devices')
ENTITY_ID_FORMAT = DOMAIN + '.{}' ENTITY_ID_FORMAT = DOMAIN + '.{}'
# After how much time do we consider a device not home if CSV_DEVICES = "known_devices.csv"
# it does not show up on scans YAML_DEVICES = 'known_devices.yaml'
TIME_DEVICE_NOT_FOUND = timedelta(minutes=3)
# Filename to save known devices to CONF_TRACK_NEW = "track_new_devices"
KNOWN_DEVICES_FILE = "known_devices.csv" DEFAULT_CONF_TRACK_NEW = True
CONF_SECONDS = "interval_seconds" CONF_CONSIDER_HOME = 'consider_home'
DEFAULT_CONF_CONSIDER_HOME = 180 # seconds
DEFAULT_CONF_SECONDS = 12 CONF_SCAN_INTERVAL = "interval_seconds"
DEFAULT_SCAN_INTERVAL = 12
TRACK_NEW_DEVICES = "track_new_devices" CONF_AWAY_HIDE = 'hide_if_away'
DEFAULT_AWAY_HIDE = False
SERVICE_SEE = 'see'
ATTR_LATITUDE = 'latitude'
ATTR_LONGITUDE = 'longitude'
ATTR_MAC = 'mac'
ATTR_DEV_ID = 'dev_id'
ATTR_HOST_NAME = 'host_name'
ATTR_LOCATION_NAME = 'location_name'
ATTR_GPS = 'gps'
DISCOVERY_PLATFORMS = {
discovery.SERVICE_NETGEAR: 'netgear',
}
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# pylint: disable=too-many-arguments
def is_on(hass, entity_id=None): def is_on(hass, entity_id=None):
""" Returns if any or specified device is home. """ """ Returns if any or specified device is home. """
@ -55,293 +85,309 @@ def is_on(hass, entity_id=None):
return hass.states.is_state(entity, STATE_HOME) return hass.states.is_state(entity, STATE_HOME)
def see(hass, mac=None, dev_id=None, host_name=None, location_name=None,
gps=None):
""" Call service to notify you see device. """
data = {key: value for key, value in
((ATTR_MAC, mac),
(ATTR_DEV_ID, dev_id),
(ATTR_HOST_NAME, host_name),
(ATTR_LOCATION_NAME, location_name),
(ATTR_GPS, gps)) if value is not None}
hass.services.call(DOMAIN, SERVICE_SEE, data)
def setup(hass, config): def setup(hass, config):
""" Sets up the device tracker. """ """ Setup device tracker """
yaml_path = hass.config.path(YAML_DEVICES)
csv_path = hass.config.path(CSV_DEVICES)
if os.path.isfile(csv_path) and not os.path.isfile(yaml_path) and \
convert_csv_config(csv_path, yaml_path):
os.remove(csv_path)
if not validate_config(config, {DOMAIN: [CONF_PLATFORM]}, _LOGGER): conf = config.get(DOMAIN, {})
return False consider_home = util.convert(conf.get(CONF_CONSIDER_HOME), int,
DEFAULT_CONF_CONSIDER_HOME)
track_new = util.convert(conf.get(CONF_TRACK_NEW), bool,
DEFAULT_CONF_TRACK_NEW)
tracker_type = config[DOMAIN].get(CONF_PLATFORM) devices = load_config(yaml_path, hass, timedelta(seconds=consider_home))
tracker = DeviceTracker(hass, consider_home, track_new, devices)
tracker_implementation = \ def setup_platform(p_type, p_config, disc_info=None):
prepare_setup_platform(hass, config, DOMAIN, tracker_type) """ Setup a device tracker platform. """
platform = prepare_setup_platform(hass, config, DOMAIN, p_type)
if tracker_implementation is None: if platform is None:
_LOGGER.error("Unknown device_tracker type specified: %s.",
tracker_type)
return False
device_scanner = tracker_implementation.get_scanner(hass, config)
if device_scanner is None:
_LOGGER.error("Failed to initialize device scanner: %s",
tracker_type)
return False
seconds = util.convert(config[DOMAIN].get(CONF_SECONDS), int,
DEFAULT_CONF_SECONDS)
track_new_devices = config[DOMAIN].get(TRACK_NEW_DEVICES) or False
_LOGGER.info("Tracking new devices: %s", track_new_devices)
tracker = DeviceTracker(hass, device_scanner, seconds, track_new_devices)
# We only succeeded if we got to parse the known devices file
return not tracker.invalid_known_devices_file
class DeviceTracker(object):
""" Class that tracks which devices are home and which are not. """
def __init__(self, hass, device_scanner, seconds, track_new_devices):
self.hass = hass
self.device_scanner = device_scanner
self.lock = threading.Lock()
# Do we track new devices by default?
self.track_new_devices = track_new_devices
# Dictionary to keep track of known devices and devices we track
self.tracked = {}
self.untracked_devices = set()
# Did we encounter an invalid known devices file
self.invalid_known_devices_file = False
# Wrap it in a func instead of lambda so it can be identified in
# the bus by its __name__ attribute.
def update_device_state(now):
""" Triggers update of the device states. """
self.update_devices(now)
dev_group = group.Group(
hass, GROUP_NAME_ALL_DEVICES, user_defined=False)
def reload_known_devices_service(service):
""" Reload known devices file. """
self._read_known_devices_file()
self.update_devices(dt_util.utcnow())
dev_group.update_tracked_entity_ids(self.device_entity_ids)
reload_known_devices_service(None)
if self.invalid_known_devices_file:
return
seconds = range(0, 60, seconds)
_LOGGER.info("Device tracker interval second=%s", seconds)
track_utc_time_change(hass, update_device_state, second=seconds)
hass.services.register(DOMAIN,
SERVICE_DEVICE_TRACKER_RELOAD,
reload_known_devices_service)
@property
def device_entity_ids(self):
""" Returns a set containing all device entity ids
that are being tracked. """
return set(device['entity_id'] for device in self.tracked.values())
def _update_state(self, now, device, is_home):
""" Update the state of a device. """
dev_info = self.tracked[device]
if is_home:
# Update last seen if at home
dev_info['last_seen'] = now
else:
# State remains at home if it has been seen in the last
# TIME_DEVICE_NOT_FOUND
is_home = now - dev_info['last_seen'] < TIME_DEVICE_NOT_FOUND
state = STATE_HOME if is_home else STATE_NOT_HOME
# overwrite properties that have been set in the config file
attr = dict(dev_info['state_attr'])
attr.update(_OVERWRITE.get(dev_info['entity_id'], {}))
self.hass.states.set(
dev_info['entity_id'], state, attr)
def update_devices(self, now):
""" Update device states based on the found devices. """
if not self.lock.acquire(False):
return return
try: try:
found_devices = set(dev.upper() for dev in if hasattr(platform, 'get_scanner'):
self.device_scanner.scan_devices()) scanner = platform.get_scanner(hass, {DOMAIN: p_config})
for device in self.tracked: if scanner is None:
is_home = device in found_devices _LOGGER.error('Error setting up platform %s', p_type)
return
self._update_state(now, device, is_home) setup_scanner_platform(hass, p_config, scanner, tracker.see)
return
if is_home: if not platform.setup_scanner(hass, p_config, tracker.see):
found_devices.remove(device) _LOGGER.error('Error setting up platform %s', p_type)
except Exception: # pylint: disable=broad-except
_LOGGER.exception('Error setting up platform %s', p_type)
# Did we find any devices that we didn't know about yet? for p_type, p_config in \
new_devices = found_devices - self.untracked_devices config_per_platform(config, DOMAIN, _LOGGER):
setup_platform(p_type, p_config)
if new_devices: def device_tracker_discovered(service, info):
if not self.track_new_devices: """ Called when a device tracker platform is discovered. """
self.untracked_devices.update(new_devices) setup_platform(DISCOVERY_PLATFORMS[service], {}, info)
self._update_known_devices_file(new_devices) discovery.listen(hass, DISCOVERY_PLATFORMS.keys(),
finally: device_tracker_discovered)
self.lock.release()
# pylint: disable=too-many-branches def update_stale(now):
def _read_known_devices_file(self): """ Clean up stale devices. """
""" Parse and process the known devices file. """ tracker.update_stale(now)
known_dev_path = self.hass.config.path(KNOWN_DEVICES_FILE) track_utc_time_change(hass, update_stale, second=range(0, 60, 5))
# Return if no known devices file exists tracker.setup_group()
if not os.path.isfile(known_dev_path):
def see_service(call):
""" Service to see a device. """
args = {key: value for key, value in call.data.items() if key in
(ATTR_MAC, ATTR_DEV_ID, ATTR_HOST_NAME, ATTR_LOCATION_NAME,
ATTR_GPS)}
tracker.see(**args)
hass.services.register(DOMAIN, SERVICE_SEE, see_service)
return True
class DeviceTracker(object):
""" Track devices """
def __init__(self, hass, consider_home, track_new, devices):
self.hass = hass
self.devices = {dev.dev_id: dev for dev in devices}
self.mac_to_dev = {dev.mac: dev for dev in devices if dev.mac}
self.consider_home = timedelta(seconds=consider_home)
self.track_new = track_new
self.lock = threading.Lock()
entity_ids = []
for device in devices:
if device.track:
entity_ids.append(device.entity_id)
device.update_ha_state()
self.group = None
def see(self, mac=None, dev_id=None, host_name=None, location_name=None,
gps=None):
""" Notify device tracker that you see a device. """
with self.lock:
if mac is None and dev_id is None:
raise HomeAssistantError('Neither mac or device id passed in')
elif mac is not None:
mac = mac.upper()
device = self.mac_to_dev.get(mac)
if not device:
dev_id = util.slugify(host_name or mac)
else:
dev_id = str(dev_id)
device = self.devices.get(dev_id)
if device:
device.seen(host_name, location_name, gps)
if device.track:
device.update_ha_state()
return
# If no device can be found, create it
device = Device(
self.hass, self.consider_home, self.track_new, dev_id, mac,
(host_name or dev_id).replace('_', ' '))
self.devices[dev_id] = device
if mac is not None:
self.mac_to_dev[mac] = device
device.seen(host_name, location_name, gps)
if device.track:
device.update_ha_state()
# During init, we ignore the group
if self.group is not None:
self.group.update_tracked_entity_ids(
list(self.group.tracking) + [device.entity_id])
update_config(self.hass.config.path(YAML_DEVICES), dev_id, device)
def setup_group(self):
""" Initializes group for all tracked devices. """
entity_ids = (dev.entity_id for dev in self.devices.values()
if dev.track)
self.group = group.setup_group(
self.hass, GROUP_NAME_ALL_DEVICES, entity_ids, False)
def update_stale(self, now):
""" Update stale devices. """
with self.lock:
for device in self.devices.values():
if device.last_update_home and device.stale(now):
device.update_ha_state(True)
class Device(Entity):
""" Tracked device. """
# pylint: disable=too-many-instance-attributes, too-many-arguments
host_name = None
location_name = None
gps = None
last_seen = None
# Track if the last update of this device was HOME
last_update_home = False
_state = STATE_NOT_HOME
def __init__(self, hass, consider_home, track, dev_id, mac, name=None,
picture=None, away_hide=False):
self.hass = hass
self.entity_id = ENTITY_ID_FORMAT.format(dev_id)
# Timedelta object how long we consider a device home if it is not
# detected anymore.
self.consider_home = consider_home
# Device ID
self.dev_id = dev_id
self.mac = mac
# If we should track this device
self.track = track
# Configured name
self.config_name = name
# Configured picture
self.config_picture = picture
self.away_hide = away_hide
@property
def name(self):
""" Returns the name of the entity. """
return self.config_name or self.host_name or DEVICE_DEFAULT_NAME
@property
def state(self):
""" State of the device. """
return self._state
@property
def state_attributes(self):
""" Device state attributes. """
attr = {}
if self.config_picture:
attr[ATTR_ENTITY_PICTURE] = self.config_picture
if self.gps:
attr[ATTR_LATITUDE] = self.gps[0],
attr[ATTR_LONGITUDE] = self.gps[1],
return attr
@property
def hidden(self):
""" If device should be hidden. """
return self.away_hide and self.state != STATE_HOME
def seen(self, host_name=None, location_name=None, gps=None):
""" Mark the device as seen. """
self.last_seen = dt_util.utcnow()
self.host_name = host_name
self.location_name = location_name
self.gps = gps
self.update()
def stale(self, now=None):
""" Return if device state is stale. """
return self.last_seen and \
(now or dt_util.utcnow()) - self.last_seen > self.consider_home
def update(self):
""" Update state of entity. """
if not self.last_seen:
return return
elif self.location_name:
self._state = self.location_name
elif self.stale():
self._state = STATE_NOT_HOME
self.last_update_home = False
else:
self._state = STATE_HOME
self.last_update_home = True
self.lock.acquire()
self.untracked_devices.clear() def convert_csv_config(csv_path, yaml_path):
""" Convert CSV config file format to YAML. """
used_ids = set()
with open(csv_path) as inp:
for row in csv.DictReader(inp):
dev_id = util.ensure_unique_string(util.slugify(row['name']),
used_ids)
used_ids.add(dev_id)
device = Device(None, None, row['track'] == '1', dev_id,
row['device'], row['name'], row['picture'])
update_config(yaml_path, dev_id, device)
return True
with open(known_dev_path) as inp:
# To track which devices need an entity_id assigned def load_config(path, hass, consider_home):
need_entity_id = [] """ Load devices from YAML config file. """
if not os.path.isfile(path):
return []
return [
Device(hass, consider_home, device.get('track', False),
str(dev_id), device.get('mac'), device.get('name'),
device.get('picture'), device.get(CONF_AWAY_HIDE, False))
for dev_id, device in load_yaml_config_file(path).items()]
# All devices that are still in this set after we read the CSV file
# have been removed from the file and thus need to be cleaned up.
removed_devices = set(self.tracked.keys())
try: def setup_scanner_platform(hass, config, scanner, see_device):
for row in csv.DictReader(inp): """ Helper method to connect scanner-based platform to device tracker. """
device = row['device'].upper() interval = util.convert(config.get(CONF_SCAN_INTERVAL), int,
DEFAULT_SCAN_INTERVAL)
if row['track'] == '1': # Initial scan of each mac we also tell about host name for config
if device in self.tracked: seen = set()
# Device exists
removed_devices.remove(device)
else:
# We found a new device
need_entity_id.append(device)
self._track_device(device, row['name']) def device_tracker_scan(now):
""" Called when interval matches. """
for mac in scanner.scan_devices():
if mac in seen:
host_name = None
else:
host_name = scanner.get_device_name(mac)
seen.add(mac)
see_device(mac=mac, host_name=host_name)
# Update state_attr with latest from file track_utc_time_change(hass, device_tracker_scan, second=range(0, 60,
state_attr = { interval))
ATTR_FRIENDLY_NAME: row['name']
}
if row['picture']: device_tracker_scan(None)
state_attr[ATTR_ENTITY_PICTURE] = row['picture']
self.tracked[device]['state_attr'] = state_attr
else: def update_config(path, dev_id, device):
self.untracked_devices.add(device) """ Add device to YAML config file. """
with open(path, 'a') as out:
out.write('\n')
out.write('{}:\n'.format(device.dev_id))
# Remove existing devices that we no longer track for key, value in (('name', device.name), ('mac', device.mac),
for device in removed_devices: ('picture', device.config_picture),
entity_id = self.tracked[device]['entity_id'] ('track', 'yes' if device.track else 'no'),
(CONF_AWAY_HIDE,
_LOGGER.info("Removing entity %s", entity_id) 'yes' if device.away_hide else 'no')):
out.write(' {}: {}\n'.format(key, '' if value is None else value))
self.hass.states.remove(entity_id)
self.tracked.pop(device)
self._generate_entity_ids(need_entity_id)
if not self.tracked:
_LOGGER.warning(
"No devices to track. Please update %s.",
known_dev_path)
_LOGGER.info("Loaded devices from %s", known_dev_path)
except KeyError:
self.invalid_known_devices_file = True
_LOGGER.warning(
("Invalid known devices file: %s. "
"We won't update it with new found devices."),
known_dev_path)
finally:
self.lock.release()
def _update_known_devices_file(self, new_devices):
""" Add new devices to known devices file. """
if not self.invalid_known_devices_file:
known_dev_path = self.hass.config.path(KNOWN_DEVICES_FILE)
try:
# If file does not exist we will write the header too
is_new_file = not os.path.isfile(known_dev_path)
with open(known_dev_path, 'a') as outp:
_LOGGER.info("Found %d new devices, updating %s",
len(new_devices), known_dev_path)
writer = csv.writer(outp)
if is_new_file:
writer.writerow(("device", "name", "track", "picture"))
for device in new_devices:
# See if the device scanner knows the name
# else defaults to unknown device
name = self.device_scanner.get_device_name(device) or \
DEVICE_DEFAULT_NAME
track = 0
if self.track_new_devices:
self._track_device(device, name)
track = 1
writer.writerow((device, name, track, ""))
if self.track_new_devices:
self._generate_entity_ids(new_devices)
except IOError:
_LOGGER.exception("Error updating %s with %d new devices",
known_dev_path, len(new_devices))
def _track_device(self, device, name):
"""
Add a device to the list of tracked devices.
Does not generate the entity id yet.
"""
default_last_seen = dt_util.utcnow().replace(year=1990)
self.tracked[device] = {
'name': name,
'last_seen': default_last_seen,
'state_attr': {ATTR_FRIENDLY_NAME: name}
}
def _generate_entity_ids(self, need_entity_id):
""" Generate entity ids for a list of devices. """
# Setup entity_ids for the new devices
used_entity_ids = [info['entity_id'] for device, info
in self.tracked.items()
if device not in need_entity_id]
for device in need_entity_id:
name = self.tracked[device]['name']
entity_id = util.ensure_unique_string(
ENTITY_ID_FORMAT.format(util.slugify(name)),
used_entity_ids)
used_entity_ids.append(entity_id)
self.tracked[device]['entity_id'] = entity_id

View File

@ -0,0 +1,48 @@
"""
homeassistant.components.device_tracker.mqtt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
MQTT platform for the device tracker.
device_tracker:
platform: mqtt
qos: 1
devices:
paulus_oneplus: /location/paulus
annetherese_n4: /location/annetherese
"""
import logging
from homeassistant import util
import homeassistant.components.mqtt as mqtt
DEPENDENCIES = ['mqtt']
CONF_QOS = 'qos'
CONF_DEVICES = 'devices'
DEFAULT_QOS = 0
_LOGGER = logging.getLogger(__name__)
def setup_scanner(hass, config, see):
""" Set up a MQTT tracker. """
devices = config.get(CONF_DEVICES)
qos = util.convert(config.get(CONF_QOS), int, DEFAULT_QOS)
if not isinstance(devices, dict):
_LOGGER.error('Expected %s to be a dict, found %s', CONF_DEVICES,
devices)
return False
dev_id_lookup = {}
def device_tracker_message_received(topic, payload, qos):
""" MQTT message received. """
see(dev_id=dev_id_lookup[topic], location_name=payload)
for dev_id, topic in devices.items():
dev_id_lookup[topic] = dev_id
mqtt.subscribe(hass, topic, device_tracker_message_received, qos)
return True

View File

@ -70,7 +70,6 @@ class NetgearDeviceScanner(object):
self.lock = threading.Lock() self.lock = threading.Lock()
if host is None: if host is None:
print("BIER")
self._api = pynetgear.Netgear() self._api = pynetgear.Netgear()
elif username is None: elif username is None:
self._api = pynetgear.Netgear(password, host) self._api = pynetgear.Netgear(password, host)

View File

@ -79,13 +79,6 @@ def setup(hass, config):
if not component: if not component:
return return
# Hack - fix when device_tracker supports discovery
if service == SERVICE_NETGEAR:
bootstrap.setup_component(hass, component, {
'device_tracker': {'platform': 'netgear'}
})
return
# This component cannot be setup. # This component cannot be setup.
if not bootstrap.setup_component(hass, component, config): if not bootstrap.setup_component(hass, component, config):
return return

View File

@ -1,2 +1,2 @@
""" DO NOT MODIFY. Auto-generated by build_frontend script """ """ DO NOT MODIFY. Auto-generated by build_frontend script """
VERSION = "35ecb5457a9ff0f4142c2605b53eb843" VERSION = "8d7dfdebcbbde875470573016b005b73"

File diff suppressed because one or more lines are too long

@ -1 +1 @@
Subproject commit b0b12e20e0f61df849c414c2dfbcf9923f784631 Subproject commit d069489d09e9155c44a0fdbdb3cecdab02d18b5f

View File

@ -60,6 +60,7 @@ MQTT_CLIENT = None
DEFAULT_PORT = 1883 DEFAULT_PORT = 1883
DEFAULT_KEEPALIVE = 60 DEFAULT_KEEPALIVE = 60
DEFAULT_QOS = 0
SERVICE_PUBLISH = 'publish' SERVICE_PUBLISH = 'publish'
EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED' EVENT_MQTT_MESSAGE_RECEIVED = 'MQTT_MESSAGE_RECEIVED'
@ -79,17 +80,18 @@ ATTR_PAYLOAD = 'payload'
ATTR_QOS = 'qos' ATTR_QOS = 'qos'
def publish(hass, topic, payload, qos=0): def publish(hass, topic, payload, qos=None):
""" Send an MQTT message. """ """ Send an MQTT message. """
data = { data = {
ATTR_TOPIC: topic, ATTR_TOPIC: topic,
ATTR_PAYLOAD: payload, ATTR_PAYLOAD: payload,
ATTR_QOS: qos,
} }
if qos is not None:
data[ATTR_QOS] = qos
hass.services.call(DOMAIN, SERVICE_PUBLISH, data) hass.services.call(DOMAIN, SERVICE_PUBLISH, data)
def subscribe(hass, topic, callback, qos=0): def subscribe(hass, topic, callback, qos=DEFAULT_QOS):
""" Subscribe to a topic. """ """ Subscribe to a topic. """
def mqtt_topic_subscriber(event): def mqtt_topic_subscriber(event):
""" Match subscribed MQTT topic. """ """ Match subscribed MQTT topic. """
@ -141,7 +143,7 @@ def setup(hass, config):
""" Handle MQTT publish service calls. """ """ Handle MQTT publish service calls. """
msg_topic = call.data.get(ATTR_TOPIC) msg_topic = call.data.get(ATTR_TOPIC)
payload = call.data.get(ATTR_PAYLOAD) payload = call.data.get(ATTR_PAYLOAD)
qos = call.data.get(ATTR_QOS) qos = call.data.get(ATTR_QOS, DEFAULT_QOS)
if msg_topic is None or payload is None: if msg_topic is None or payload is None:
return return
MQTT_CLIENT.publish(msg_topic, payload, qos) MQTT_CLIENT.publish(msg_topic, payload, qos)

View File

@ -1,6 +1,6 @@
""" Constants used by Home Assistant components. """ """ Constants used by Home Assistant components. """
__version__ = "0.7.3dev" __version__ = "0.7.3dev0"
# Can be used to specify a catch all when registering state or event listeners. # Can be used to specify a catch all when registering state or event listeners.
MATCH_ALL = '*' MATCH_ALL = '*'
@ -40,7 +40,7 @@ STATE_ON = 'on'
STATE_OFF = 'off' STATE_OFF = 'off'
STATE_HOME = 'home' STATE_HOME = 'home'
STATE_NOT_HOME = 'not_home' STATE_NOT_HOME = 'not_home'
STATE_UNKNOWN = "unknown" STATE_UNKNOWN = 'unknown'
STATE_OPEN = 'open' STATE_OPEN = 'open'
STATE_CLOSED = 'closed' STATE_CLOSED = 'closed'
STATE_PLAYING = 'playing' STATE_PLAYING = 'playing'

View File

@ -10,8 +10,8 @@ from collections import defaultdict
from homeassistant.exceptions import NoEntitySpecifiedError from homeassistant.exceptions import NoEntitySpecifiedError
from homeassistant.const import ( from homeassistant.const import (
ATTR_FRIENDLY_NAME, ATTR_UNIT_OF_MEASUREMENT, ATTR_HIDDEN, ATTR_FRIENDLY_NAME, ATTR_HIDDEN, ATTR_UNIT_OF_MEASUREMENT,
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, TEMP_CELCIUS, DEVICE_DEFAULT_NAME, STATE_ON, STATE_OFF, STATE_UNKNOWN, TEMP_CELCIUS,
TEMP_FAHRENHEIT) TEMP_FAHRENHEIT)
# Dict mapping entity_id to a boolean that overwrites the hidden property # Dict mapping entity_id to a boolean that overwrites the hidden property
@ -44,17 +44,17 @@ class Entity(object):
@property @property
def name(self): def name(self):
""" Returns the name of the entity. """ """ Returns the name of the entity. """
return self.get_name() return DEVICE_DEFAULT_NAME
@property @property
def state(self): def state(self):
""" Returns the state of the entity. """ """ Returns the state of the entity. """
return self.get_state() return STATE_UNKNOWN
@property @property
def state_attributes(self): def state_attributes(self):
""" Returns the state attributes. """ """ Returns the state attributes. """
return {} return None
@property @property
def unit_of_measurement(self): def unit_of_measurement(self):
@ -64,34 +64,12 @@ class Entity(object):
@property @property
def hidden(self): def hidden(self):
""" Suggestion if the entity should be hidden from UIs. """ """ Suggestion if the entity should be hidden from UIs. """
return self._hidden return False
@hidden.setter
def hidden(self, val):
""" Sets the suggestion for visibility. """
self._hidden = bool(val)
def update(self): def update(self):
""" Retrieve latest state. """ """ Retrieve latest state. """
pass pass
# DEPRECATION NOTICE:
# Device is moving from getters to properties.
# For now the new properties will call the old functions
# This will be removed in the future.
def get_name(self):
""" Returns the name of the entity if any. """
return DEVICE_DEFAULT_NAME
def get_state(self):
""" Returns state of the entity. """
return "Unknown"
def get_state_attributes(self):
""" Returns optional state attributes. """
return None
# DO NOT OVERWRITE # DO NOT OVERWRITE
# These properties and methods are either managed by Home Assistant or they # These properties and methods are either managed by Home Assistant or they
# are used to perform a very specific function. Overwriting these may # are used to perform a very specific function. Overwriting these may

View File

@ -129,13 +129,13 @@ class EntityComponent(object):
if platform is None: if platform is None:
return return
platform_name = '{}.{}'.format(self.domain, platform_type)
try: try:
platform.setup_platform( platform.setup_platform(
self.hass, platform_config, self.add_entities, discovery_info) self.hass, platform_config, self.add_entities, discovery_info)
self.hass.config.components.append(platform_name)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
self.logger.exception( self.logger.exception(
'Error while setting up platform %s', platform_type) 'Error while setting up platform %s', platform_type)
return
platform_name = '{}.{}'.format(self.domain, platform_type)
self.hass.config.components.append(platform_name)

View File

@ -71,7 +71,7 @@ def ensure_unique_string(preferred_string, current_strings):
""" Returns a string that is not present in current_strings. """ Returns a string that is not present in current_strings.
If preferred string exists will append _2, _3, .. """ If preferred string exists will append _2, _3, .. """
test_string = preferred_string test_string = preferred_string
current_strings = list(current_strings) current_strings = set(current_strings)
tries = 1 tries = 1
@ -244,22 +244,22 @@ class Throttle(object):
Wrapper that allows wrapped to be called only once per min_time. Wrapper that allows wrapped to be called only once per min_time.
If we cannot acquire the lock, it is running so return None. If we cannot acquire the lock, it is running so return None.
""" """
if lock.acquire(False): if not lock.acquire(False):
try: return None
last_call = wrapper.last_call try:
last_call = wrapper.last_call
# Check if method is never called or no_throttle is given # Check if method is never called or no_throttle is given
force = not last_call or kwargs.pop('no_throttle', False) force = not last_call or kwargs.pop('no_throttle', False)
if force or datetime.now() - last_call > self.min_time: if force or utcnow() - last_call > self.min_time:
result = method(*args, **kwargs)
result = method(*args, **kwargs) wrapper.last_call = utcnow()
wrapper.last_call = datetime.now() return result
return result else:
else: return None
return None finally:
finally: lock.release()
lock.release()
wrapper.last_call = None wrapper.last_call = None

View File

@ -10,11 +10,11 @@ from unittest import mock
from homeassistant import core as ha, loader from homeassistant import core as ha, loader
import homeassistant.util.location as location_util import homeassistant.util.location as location_util
import homeassistant.util.dt as dt_util
from homeassistant.helpers.entity import ToggleEntity from homeassistant.helpers.entity import ToggleEntity
from homeassistant.const import ( from homeassistant.const import (
STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED, STATE_ON, STATE_OFF, DEVICE_DEFAULT_NAME, EVENT_TIME_CHANGED,
EVENT_STATE_CHANGED) EVENT_STATE_CHANGED, EVENT_PLATFORM_DISCOVERED, ATTR_SERVICE,
ATTR_DISCOVERED)
from homeassistant.components import sun, mqtt from homeassistant.components import sun, mqtt
@ -38,8 +38,8 @@ def get_test_home_assistant(num_threads=None):
hass.config.latitude = 32.87336 hass.config.latitude = 32.87336
hass.config.longitude = -117.22743 hass.config.longitude = -117.22743
# if not loader.PREPARED: if 'custom_components.test' not in loader.AVAILABLE_COMPONENTS:
loader. prepare(hass) loader.prepare(hass)
return hass return hass
@ -86,10 +86,11 @@ def fire_time_changed(hass, time):
hass.bus.fire(EVENT_TIME_CHANGED, {'now': time}) hass.bus.fire(EVENT_TIME_CHANGED, {'now': time})
def trigger_device_tracker_scan(hass): def fire_service_discovered(hass, service, info):
""" Triggers the device tracker to scan. """ hass.bus.fire(EVENT_PLATFORM_DISCOVERED, {
fire_time_changed( ATTR_SERVICE: service,
hass, dt_util.utcnow().replace(second=0) + timedelta(hours=1)) ATTR_DISCOVERED: info
})
def ensure_sun_risen(hass): def ensure_sun_risen(hass):

View File

@ -0,0 +1,233 @@
"""
tests.test_component_device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the device tracker compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
from unittest.mock import patch
from datetime import timedelta
import os
from homeassistant.config import load_yaml_config_file
from homeassistant.loader import get_component
import homeassistant.util.dt as dt_util
from homeassistant.const import (
ATTR_ENTITY_ID, ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME, ATTR_HIDDEN,
STATE_HOME, STATE_NOT_HOME, CONF_PLATFORM, )
import homeassistant.components.device_tracker as device_tracker
from tests.common import (
get_test_home_assistant, fire_time_changed, fire_service_discovered)
class TestComponentsDeviceTracker(unittest.TestCase):
""" Tests homeassistant.components.device_tracker module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
self.yaml_devices = self.hass.config.path(device_tracker.YAML_DEVICES)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
try:
os.remove(self.yaml_devices)
except FileNotFoundError:
pass
self.hass.stop()
def test_is_on(self):
""" Test is_on method. """
entity_id = device_tracker.ENTITY_ID_FORMAT.format('test')
self.hass.states.set(entity_id, STATE_HOME)
self.assertTrue(device_tracker.is_on(self.hass, entity_id))
self.hass.states.set(entity_id, STATE_NOT_HOME)
self.assertFalse(device_tracker.is_on(self.hass, entity_id))
def test_migrating_config(self):
csv_devices = self.hass.config.path(device_tracker.CSV_DEVICES)
self.assertFalse(os.path.isfile(csv_devices))
self.assertFalse(os.path.isfile(self.yaml_devices))
person1 = {
'mac': 'AB:CD:EF:GH:IJ:KL',
'name': 'Paulus',
'track': True,
'picture': 'http://placehold.it/200x200',
}
person2 = {
'mac': 'MN:OP:QR:ST:UV:WX:YZ',
'name': 'Anne Therese',
'track': False,
'picture': None,
}
try:
with open(csv_devices, 'w') as fil:
fil.write('device,name,track,picture\n')
for pers in (person1, person2):
fil.write('{},{},{},{}\n'.format(
pers['mac'], pers['name'],
'1' if pers['track'] else '0', pers['picture'] or ''))
self.assertTrue(device_tracker.setup(self.hass, {}))
self.assertFalse(os.path.isfile(csv_devices))
self.assertTrue(os.path.isfile(self.yaml_devices))
yaml_config = load_yaml_config_file(self.yaml_devices)
self.assertEqual(2, len(yaml_config))
for pers, yaml_pers in zip(
(person2, person1), sorted(yaml_config.values(),
key=lambda pers: pers['name'])):
for key, value in pers.items():
self.assertEqual(value, yaml_pers.get(key))
finally:
try:
os.remove(csv_devices)
except FileNotFoundError:
pass
def test_reading_yaml_config(self):
dev_id = 'test'
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, 'AB:CD:EF:GH:IJ',
'Test name', 'http://test.picture', True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
self.assertTrue(device_tracker.setup(self.hass, {}))
config = device_tracker.load_config(self.yaml_devices, self.hass,
device.consider_home)[0]
self.assertEqual(device.dev_id, config.dev_id)
self.assertEqual(device.track, config.track)
self.assertEqual(device.mac, config.mac)
self.assertEqual(device.config_picture, config.config_picture)
self.assertEqual(device.away_hide, config.away_hide)
self.assertEqual(device.consider_home, config.consider_home)
def test_setup_without_yaml_file(self):
self.assertTrue(device_tracker.setup(self.hass, {}))
def test_adding_unknown_device_to_config(self):
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
config = device_tracker.load_config(self.yaml_devices, self.hass,
timedelta(seconds=0))[0]
self.assertEqual('DEV1', config.dev_id)
self.assertEqual(True, config.track)
def test_discovery(self):
scanner = get_component('device_tracker.test').SCANNER
with patch.dict(device_tracker.DISCOVERY_PLATFORMS, {'test': 'test'}):
with patch.object(scanner, 'scan_devices') as mock_scan:
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
fire_service_discovered(self.hass, 'test', {})
self.assertTrue(mock_scan.called)
def test_update_stale(self):
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
scanner.come_home('DEV1')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
self.assertEqual(STATE_HOME,
self.hass.states.get('device_tracker.dev1').state)
scanner.leave_home('DEV1')
now = dt_util.utcnow().replace(second=0) + timedelta(hours=1)
with patch('homeassistant.util.dt.utcnow', return_value=now):
fire_time_changed(self.hass, now)
self.hass.pool.block_till_done()
self.assertEqual(STATE_NOT_HOME,
self.hass.states.get('device_tracker.dev1').state)
def test_entity_attributes(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
friendly_name = 'Paulus'
picture = 'http://placehold.it/200x200'
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
friendly_name, picture, away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
self.assertTrue(device_tracker.setup(self.hass, {}))
attrs = self.hass.states.get(entity_id).attributes
self.assertEqual(friendly_name, attrs.get(ATTR_FRIENDLY_NAME))
self.assertEqual(picture, attrs.get(ATTR_ENTITY_PICTURE))
def test_device_hidden(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
self.assertTrue(self.hass.states.get(entity_id)
.attributes.get(ATTR_HIDDEN))
def test_group_all_devices(self):
dev_id = 'test_entity'
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
device = device_tracker.Device(
self.hass, timedelta(seconds=180), True, dev_id, None,
away_hide=True)
device_tracker.update_config(self.yaml_devices, dev_id, device)
scanner = get_component('device_tracker.test').SCANNER
scanner.reset()
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
state = self.hass.states.get(device_tracker.ENTITY_ID_ALL_DEVICES)
self.assertIsNotNone(state)
self.assertEqual(STATE_NOT_HOME, state.state)
self.assertSequenceEqual((entity_id,),
state.attributes.get(ATTR_ENTITY_ID))
@patch('homeassistant.components.device_tracker.DeviceTracker.see')
def test_see_service(self, mock_see):
self.assertTrue(device_tracker.setup(self.hass, {}))
mac = 'AB:CD:EF:GH'
dev_id = 'some_device'
host_name = 'example.com'
location_name = 'Work'
gps = [.3, .8]
device_tracker.see(self.hass, mac, dev_id, host_name, location_name,
gps)
self.hass.pool.block_till_done()
mock_see.assert_called_once_with(
mac=mac, dev_id=dev_id, host_name=host_name,
location_name=location_name, gps=gps)

View File

@ -0,0 +1,37 @@
import unittest
import os
from homeassistant.components import device_tracker
from homeassistant.const import CONF_PLATFORM
from tests.common import (
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message)
class TestComponentsDeviceTrackerMQTT(unittest.TestCase):
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
mock_mqtt_component(self.hass)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError:
pass
def test_new_message(self):
dev_id = 'paulus'
enttiy_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
topic = '/location/paulus'
location = 'work'
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'mqtt',
'devices': {dev_id: topic}
}}))
fire_mqtt_message(self.hass, topic, location)
self.hass.pool.block_till_done()
self.assertEqual(location, self.hass.states.get(enttiy_id).state)

View File

@ -5,6 +5,7 @@ tests.test_component_demo
Tests demo component. Tests demo component.
""" """
import unittest import unittest
from unittest.mock import patch
import homeassistant.core as ha import homeassistant.core as ha
import homeassistant.components.demo as demo import homeassistant.components.demo as demo
@ -23,13 +24,15 @@ class TestDemo(unittest.TestCase):
""" Stop down stuff we started. """ """ Stop down stuff we started. """
self.hass.stop() self.hass.stop()
def test_if_demo_state_shows_by_default(self): @patch('homeassistant.components.sun.setup')
def test_if_demo_state_shows_by_default(self, mock_sun_setup):
""" Test if demo state shows if we give no configuration. """ """ Test if demo state shows if we give no configuration. """
demo.setup(self.hass, {demo.DOMAIN: {}}) demo.setup(self.hass, {demo.DOMAIN: {}})
self.assertIsNotNone(self.hass.states.get('a.Demo_Mode')) self.assertIsNotNone(self.hass.states.get('a.Demo_Mode'))
def test_hiding_demo_state(self): @patch('homeassistant.components.sun.setup')
def test_hiding_demo_state(self, mock_sun_setup):
""" Test if you can hide the demo card. """ """ Test if you can hide the demo card. """
demo.setup(self.hass, {demo.DOMAIN: {'hide_demo_state': 1}}) demo.setup(self.hass, {demo.DOMAIN: {'hide_demo_state': 1}})

View File

@ -9,14 +9,14 @@ import os
import unittest import unittest
import homeassistant.loader as loader import homeassistant.loader as loader
from homeassistant.const import CONF_PLATFORM from homeassistant.const import CONF_PLATFORM, STATE_HOME, STATE_NOT_HOME
from homeassistant.components import ( from homeassistant.components import (
device_tracker, light, sun, device_sun_light_trigger) device_tracker, light, sun, device_sun_light_trigger)
from tests.common import ( from tests.common import (
get_test_config_dir, get_test_home_assistant, ensure_sun_risen, get_test_config_dir, get_test_home_assistant, ensure_sun_risen,
ensure_sun_set, trigger_device_tracker_scan) ensure_sun_set)
KNOWN_DEV_PATH = None KNOWN_DEV_PATH = None
@ -27,7 +27,7 @@ def setUpModule(): # pylint: disable=invalid-name
global KNOWN_DEV_PATH global KNOWN_DEV_PATH
KNOWN_DEV_PATH = os.path.join(get_test_config_dir(), KNOWN_DEV_PATH = os.path.join(get_test_config_dir(),
device_tracker.KNOWN_DEVICES_FILE) device_tracker.CSV_DEVICES)
with open(KNOWN_DEV_PATH, 'w') as fil: with open(KNOWN_DEV_PATH, 'w') as fil:
fil.write('device,name,track,picture\n') fil.write('device,name,track,picture\n')
@ -37,7 +37,8 @@ def setUpModule(): # pylint: disable=invalid-name
def tearDownModule(): # pylint: disable=invalid-name def tearDownModule(): # pylint: disable=invalid-name
""" Stops the Home Assistant server. """ """ Stops the Home Assistant server. """
os.remove(KNOWN_DEV_PATH) os.remove(os.path.join(get_test_config_dir(),
device_tracker.YAML_DEVICES))
class TestDeviceSunLightTrigger(unittest.TestCase): class TestDeviceSunLightTrigger(unittest.TestCase):
@ -54,15 +55,16 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
loader.get_component('light.test').init() loader.get_component('light.test').init()
device_tracker.setup(self.hass, { self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'} device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}) }))
light.setup(self.hass, { self.assertTrue(light.setup(self.hass, {
light.DOMAIN: {CONF_PLATFORM: 'test'} light.DOMAIN: {CONF_PLATFORM: 'test'}
}) }))
sun.setup(self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}}) self.assertTrue(sun.setup(
self.hass, {sun.DOMAIN: {sun.CONF_ELEVATION: 0}}))
def tearDown(self): # pylint: disable=invalid-name def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """ """ Stop down stuff we started. """
@ -71,8 +73,8 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
def test_lights_on_when_sun_sets(self): def test_lights_on_when_sun_sets(self):
""" Test lights go on when there is someone home and the sun sets. """ """ Test lights go on when there is someone home and the sun sets. """
device_sun_light_trigger.setup( self.assertTrue(device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}}) self.hass, {device_sun_light_trigger.DOMAIN: {}}))
ensure_sun_risen(self.hass) ensure_sun_risen(self.hass)
@ -92,12 +94,11 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
device_sun_light_trigger.setup( self.assertTrue(device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}}) self.hass, {device_sun_light_trigger.DOMAIN: {}}))
self.scanner.leave_home('DEV1') self.hass.states.set(device_tracker.ENTITY_ID_ALL_DEVICES,
STATE_NOT_HOME)
trigger_device_tracker_scan(self.hass)
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
@ -111,11 +112,11 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
device_sun_light_trigger.setup( self.assertTrue(device_sun_light_trigger.setup(
self.hass, {device_sun_light_trigger.DOMAIN: {}}) self.hass, {device_sun_light_trigger.DOMAIN: {}}))
self.scanner.come_home('DEV2') self.hass.states.set(
trigger_device_tracker_scan(self.hass) device_tracker.ENTITY_ID_FORMAT.format('device_2'), STATE_HOME)
self.hass.pool.block_till_done() self.hass.pool.block_till_done()

View File

@ -1,193 +0,0 @@
"""
tests.test_component_device_tracker
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Tests the device tracker compoments.
"""
# pylint: disable=protected-access,too-many-public-methods
import unittest
from datetime import timedelta
import os
import homeassistant.core as ha
import homeassistant.loader as loader
import homeassistant.util.dt as dt_util
from homeassistant.const import (
STATE_HOME, STATE_NOT_HOME, ATTR_ENTITY_PICTURE, CONF_PLATFORM,
DEVICE_DEFAULT_NAME)
import homeassistant.components.device_tracker as device_tracker
from tests.common import get_test_home_assistant
class TestComponentsDeviceTracker(unittest.TestCase):
""" Tests homeassistant.components.device_tracker module. """
def setUp(self): # pylint: disable=invalid-name
""" Init needed objects. """
self.hass = get_test_home_assistant()
self.known_dev_path = self.hass.config.path(
device_tracker.KNOWN_DEVICES_FILE)
def tearDown(self): # pylint: disable=invalid-name
""" Stop down stuff we started. """
self.hass.stop()
if os.path.isfile(self.known_dev_path):
os.remove(self.known_dev_path)
def test_is_on(self):
""" Test is_on method. """
entity_id = device_tracker.ENTITY_ID_FORMAT.format('test')
self.hass.states.set(entity_id, STATE_HOME)
self.assertTrue(device_tracker.is_on(self.hass, entity_id))
self.hass.states.set(entity_id, STATE_NOT_HOME)
self.assertFalse(device_tracker.is_on(self.hass, entity_id))
def test_setup(self):
""" Test setup method. """
# Bogus config
self.assertFalse(device_tracker.setup(self.hass, {}))
self.assertFalse(
device_tracker.setup(self.hass, {device_tracker.DOMAIN: {}}))
# Test with non-existing component
self.assertFalse(device_tracker.setup(
self.hass, {device_tracker.DOMAIN: {CONF_PLATFORM: 'nonexisting'}}
))
# Test with a bad known device file around
with open(self.known_dev_path, 'w') as fil:
fil.write("bad data\nbad data\n")
self.assertFalse(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}))
def test_writing_known_devices_file(self):
""" Test the device tracker class. """
scanner = loader.get_component(
'device_tracker.test').get_scanner(None, None)
scanner.reset()
scanner.come_home('DEV1')
scanner.come_home('DEV2')
self.assertTrue(device_tracker.setup(self.hass, {
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}
}))
# Ensure a new known devices file has been created.
# Since the device_tracker uses a set internally we cannot
# know what the order of the devices in the known devices file is.
# To ensure all the three expected lines are there, we sort the file
with open(self.known_dev_path) as fil:
self.assertEqual(
['DEV1,{},0,\n'.format(DEVICE_DEFAULT_NAME), 'DEV2,dev2,0,\n',
'device,name,track,picture\n'],
sorted(fil))
# Write one where we track dev1, dev2
with open(self.known_dev_path, 'w') as fil:
fil.write('device,name,track,picture\n')
fil.write('DEV1,device 1,1,http://example.com/dev1.jpg\n')
fil.write('DEV2,device 2,1,http://example.com/dev2.jpg\n')
scanner.leave_home('DEV1')
scanner.come_home('DEV3')
self.hass.services.call(
device_tracker.DOMAIN,
device_tracker.SERVICE_DEVICE_TRACKER_RELOAD)
self.hass.pool.block_till_done()
dev1 = device_tracker.ENTITY_ID_FORMAT.format('device_1')
dev2 = device_tracker.ENTITY_ID_FORMAT.format('device_2')
dev3 = device_tracker.ENTITY_ID_FORMAT.format('DEV3')
now = dt_util.utcnow()
# Device scanner scans every 12 seconds. We need to sync our times to
# be every 12 seconds or else the time_changed event will be ignored.
nowAlmostMinimumGone = now + device_tracker.TIME_DEVICE_NOT_FOUND
nowAlmostMinimumGone -= timedelta(
seconds=12+(nowAlmostMinimumGone.second % 12))
nowMinimumGone = now + device_tracker.TIME_DEVICE_NOT_FOUND
nowMinimumGone += timedelta(seconds=12-(nowMinimumGone.second % 12))
# Test initial is correct
self.assertTrue(device_tracker.is_on(self.hass))
self.assertFalse(device_tracker.is_on(self.hass, dev1))
self.assertTrue(device_tracker.is_on(self.hass, dev2))
self.assertIsNone(self.hass.states.get(dev3))
self.assertEqual(
'http://example.com/dev1.jpg',
self.hass.states.get(dev1).attributes.get(ATTR_ENTITY_PICTURE))
self.assertEqual(
'http://example.com/dev2.jpg',
self.hass.states.get(dev2).attributes.get(ATTR_ENTITY_PICTURE))
# Test if dev3 got added to known dev file
with open(self.known_dev_path) as fil:
self.assertEqual('DEV3,dev3,0,\n', list(fil)[-1])
# Change dev3 to track
with open(self.known_dev_path, 'w') as fil:
fil.write("device,name,track,picture\n")
fil.write('DEV1,Device 1,1,http://example.com/picture.jpg\n')
fil.write('DEV2,Device 2,1,http://example.com/picture.jpg\n')
fil.write('DEV3,DEV3,1,\n')
scanner.come_home('DEV1')
scanner.leave_home('DEV2')
# reload dev file
self.hass.services.call(
device_tracker.DOMAIN,
device_tracker.SERVICE_DEVICE_TRACKER_RELOAD)
self.hass.pool.block_till_done()
# Test what happens if a device comes home and another leaves
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
# Dev2 will still be home because of the error margin on time
self.assertTrue(device_tracker.is_on(self.hass, dev2))
# dev3 should be tracked now after we reload the known devices
self.assertTrue(device_tracker.is_on(self.hass, dev3))
self.assertIsNone(
self.hass.states.get(dev3).attributes.get(ATTR_ENTITY_PICTURE))
# Test if device leaves what happens, test the time span
self.hass.bus.fire(
ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: nowAlmostMinimumGone})
self.hass.pool.block_till_done()
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
# Dev2 will still be home because of the error time
self.assertTrue(device_tracker.is_on(self.hass, dev2))
self.assertTrue(device_tracker.is_on(self.hass, dev3))
# Now test if gone for longer then error margin
self.hass.bus.fire(
ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: nowMinimumGone})
self.hass.pool.block_till_done()
self.assertTrue(device_tracker.is_on(self.hass))
self.assertTrue(device_tracker.is_on(self.hass, dev1))
self.assertFalse(device_tracker.is_on(self.hass, dev2))
self.assertTrue(device_tracker.is_on(self.hass, dev3))

View File

@ -8,6 +8,8 @@ Tests the history component.
import time import time
import os import os
import unittest import unittest
from unittest.mock import patch
from datetime import timedelta
import homeassistant.core as ha import homeassistant.core as ha
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@ -68,11 +70,7 @@ class TestComponentHistory(unittest.TestCase):
self.init_recorder() self.init_recorder()
states = [] states = []
# Create 10 states for 5 different entities for i in range(5):
# After the first 5, sleep a second and save the time
# history.get_states takes the latest states BEFORE point X
for i in range(10):
state = ha.State( state = ha.State(
'test.point_in_time_{}'.format(i % 5), 'test.point_in_time_{}'.format(i % 5),
"State {}".format(i), "State {}".format(i),
@ -80,19 +78,27 @@ class TestComponentHistory(unittest.TestCase):
mock_state_change_event(self.hass, state) mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
recorder._INSTANCE.block_till_done()
if i < 5: states.append(state)
states.append(state)
if i == 4: recorder._INSTANCE.block_till_done()
time.sleep(1)
point = dt_util.utcnow()
self.assertEqual( point = dt_util.utcnow() + timedelta(seconds=1)
states,
sorted( with patch('homeassistant.util.dt.utcnow', return_value=point):
history.get_states(point), key=lambda state: state.entity_id)) for i in range(5):
state = ha.State(
'test.point_in_time_{}'.format(i % 5),
"State {}".format(i),
{'attribute_test': i})
mock_state_change_event(self.hass, state)
self.hass.pool.block_till_done()
# Get states returns everything before POINT
self.assertEqual(states,
sorted(history.get_states(point),
key=lambda state: state.entity_id))
# Test get_state here because we have a DB setup # Test get_state here because we have a DB setup
self.assertEqual( self.assertEqual(
@ -113,22 +119,20 @@ class TestComponentHistory(unittest.TestCase):
set_state('YouTube') set_state('YouTube')
start = dt_util.utcnow() start = dt_util.utcnow()
point = start + timedelta(seconds=1)
end = point + timedelta(seconds=1)
time.sleep(1) with patch('homeassistant.util.dt.utcnow', return_value=point):
states = [
set_state('idle'),
set_state('Netflix'),
set_state('Plex'),
set_state('YouTube'),
]
states = [ with patch('homeassistant.util.dt.utcnow', return_value=end):
set_state('idle'), set_state('Netflix')
set_state('Netflix'), set_state('Plex')
set_state('Plex'),
set_state('YouTube'),
]
time.sleep(1)
end = dt_util.utcnow()
set_state('Netflix')
set_state('Plex')
self.assertEqual( self.assertEqual(
{entity_id: states}, {entity_id: states},

View File

@ -7,9 +7,9 @@ Tests switch component.
# pylint: disable=too-many-public-methods,protected-access # pylint: disable=too-many-public-methods,protected-access
import unittest import unittest
import homeassistant.loader as loader from homeassistant import loader
from homeassistant.components import switch
from homeassistant.const import STATE_ON, STATE_OFF, CONF_PLATFORM from homeassistant.const import STATE_ON, STATE_OFF, CONF_PLATFORM
import homeassistant.components.switch as switch
from tests.common import get_test_home_assistant from tests.common import get_test_home_assistant

View File

@ -34,14 +34,6 @@ class TestHelpersEntity(unittest.TestCase):
ATTR_HIDDEN, ATTR_HIDDEN,
self.hass.states.get(self.entity.entity_id).attributes) self.hass.states.get(self.entity.entity_id).attributes)
def test_setting_hidden_to_true(self):
self.entity.hidden = True
self.entity.update_ha_state()
state = self.hass.states.get(self.entity.entity_id)
self.assertTrue(state.attributes.get(ATTR_HIDDEN))
def test_overwriting_hidden_property_to_true(self): def test_overwriting_hidden_property_to_true(self):
""" Test we can overwrite hidden property to True. """ """ Test we can overwrite hidden property to True. """
entity.Entity.overwrite_attribute(self.entity.entity_id, entity.Entity.overwrite_attribute(self.entity.entity_id,
@ -50,14 +42,3 @@ class TestHelpersEntity(unittest.TestCase):
state = self.hass.states.get(self.entity.entity_id) state = self.hass.states.get(self.entity.entity_id)
self.assertTrue(state.attributes.get(ATTR_HIDDEN)) self.assertTrue(state.attributes.get(ATTR_HIDDEN))
def test_overwriting_hidden_property_to_false(self):
""" Test we can overwrite hidden property to True. """
entity.Entity.overwrite_attribute(self.entity.entity_id,
[ATTR_HIDDEN], [False])
self.entity.hidden = True
self.entity.update_ha_state()
self.assertNotIn(
ATTR_HIDDEN,
self.hass.states.get(self.entity.entity_id).attributes)

View File

@ -7,13 +7,13 @@ Tests component helpers.
# pylint: disable=protected-access,too-many-public-methods # pylint: disable=protected-access,too-many-public-methods
import unittest import unittest
from common import get_test_home_assistant
import homeassistant.core as ha import homeassistant.core as ha
import homeassistant.loader as loader import homeassistant.loader as loader
from homeassistant.const import STATE_ON, STATE_OFF, ATTR_ENTITY_ID from homeassistant.const import STATE_ON, STATE_OFF, ATTR_ENTITY_ID
from homeassistant.helpers import extract_entity_ids from homeassistant.helpers import extract_entity_ids
from tests.common import get_test_home_assistant
class TestComponentsCore(unittest.TestCase): class TestComponentsCore(unittest.TestCase):
""" Tests homeassistant.components module. """ """ Tests homeassistant.components module. """

View File

@ -15,7 +15,7 @@ from homeassistant.const import (
CONF_LATITUDE, CONF_LONGITUDE, CONF_TEMPERATURE_UNIT, CONF_NAME, CONF_LATITUDE, CONF_LONGITUDE, CONF_TEMPERATURE_UNIT, CONF_NAME,
CONF_TIME_ZONE) CONF_TIME_ZONE)
from common import get_test_config_dir, mock_detect_location_info from tests.common import get_test_config_dir, mock_detect_location_info
CONFIG_DIR = get_test_config_dir() CONFIG_DIR = get_test_config_dir()
YAML_PATH = os.path.join(CONFIG_DIR, config_util.YAML_CONFIG_FILE) YAML_PATH = os.path.join(CONFIG_DIR, config_util.YAML_CONFIG_FILE)

View File

@ -8,10 +8,10 @@ Provides tests to verify that Home Assistant core works.
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import os import os
import unittest import unittest
import unittest.mock as mock from unittest.mock import patch
import time import time
import threading import threading
from datetime import datetime from datetime import datetime, timedelta
import pytz import pytz
@ -55,29 +55,26 @@ class TestHomeAssistant(unittest.TestCase):
self.hass.pool.block_till_done() self.hass.pool.block_till_done()
self.assertEqual(1, len(calls)) self.assertEqual(1, len(calls))
# @patch('homeassistant.core.time.sleep')
def test_block_till_stoped(self): def test_block_till_stoped(self):
""" Test if we can block till stop service is called. """ """ Test if we can block till stop service is called. """
blocking_thread = threading.Thread(target=self.hass.block_till_stopped) with patch('time.sleep'):
blocking_thread = threading.Thread(
target=self.hass.block_till_stopped)
self.assertFalse(blocking_thread.is_alive()) self.assertFalse(blocking_thread.is_alive())
blocking_thread.start() blocking_thread.start()
# Threads are unpredictable, try 20 times if we're ready self.assertTrue(blocking_thread.is_alive())
wait_loops = 0
while not blocking_thread.is_alive() and wait_loops < 20:
wait_loops += 1
time.sleep(0.05)
self.assertTrue(blocking_thread.is_alive()) self.hass.services.call(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP)
self.hass.pool.block_till_done()
self.hass.services.call(ha.DOMAIN, ha.SERVICE_HOMEASSISTANT_STOP) # Wait for thread to stop
self.hass.pool.block_till_done() for _ in range(20):
if not blocking_thread.is_alive():
# Threads are unpredictable, try 20 times if we're ready break
wait_loops = 0
while blocking_thread.is_alive() and wait_loops < 20:
wait_loops += 1
time.sleep(0.05) time.sleep(0.05)
self.assertFalse(blocking_thread.is_alive()) self.assertFalse(blocking_thread.is_alive())
@ -88,13 +85,9 @@ class TestHomeAssistant(unittest.TestCase):
lambda event: calls.append(1)) lambda event: calls.append(1))
def raise_keyboardinterrupt(length): def raise_keyboardinterrupt(length):
# We don't want to patch the sleep of the timer. raise KeyboardInterrupt
if length == 1:
raise KeyboardInterrupt
self.hass.start() with patch('homeassistant.core.time.sleep', raise_keyboardinterrupt):
with mock.patch('time.sleep', raise_keyboardinterrupt):
self.hass.block_till_stopped() self.hass.block_till_stopped()
self.assertEqual(1, len(calls)) self.assertEqual(1, len(calls))
@ -400,9 +393,10 @@ class TestStateMachine(unittest.TestCase):
def test_last_changed_not_updated_on_same_state(self): def test_last_changed_not_updated_on_same_state(self):
state = self.states.get('light.Bowl') state = self.states.get('light.Bowl')
time.sleep(1) future = dt_util.utcnow() + timedelta(hours=10)
self.states.set("light.Bowl", "on") with patch('homeassistant.util.dt.utcnow', return_value=future):
self.states.set("light.Bowl", "on", {'attr': 'triggers_change'})
self.assertEqual(state.last_changed, self.assertEqual(state.last_changed,
self.states.get('light.Bowl').last_changed) self.states.get('light.Bowl').last_changed)

View File

@ -10,7 +10,7 @@ import unittest
import homeassistant.loader as loader import homeassistant.loader as loader
import homeassistant.components.http as http import homeassistant.components.http as http
from common import get_test_home_assistant, MockModule from tests.common import get_test_home_assistant, MockModule
class TestLoader(unittest.TestCase): class TestLoader(unittest.TestCase):
@ -24,9 +24,9 @@ class TestLoader(unittest.TestCase):
def test_set_component(self): def test_set_component(self):
""" Test if set_component works. """ """ Test if set_component works. """
loader.set_component('switch.test', http) loader.set_component('switch.test_set', http)
self.assertEqual(http, loader.get_component('switch.test')) self.assertEqual(http, loader.get_component('switch.test_set'))
def test_get_component(self): def test_get_component(self):
""" Test if get_component works. """ """ Test if get_component works. """

View File

@ -6,10 +6,11 @@ Tests Home Assistant util methods.
""" """
# pylint: disable=too-many-public-methods # pylint: disable=too-many-public-methods
import unittest import unittest
import time from unittest.mock import patch
from datetime import datetime, timedelta from datetime import datetime, timedelta
import homeassistant.util as util from homeassistant import util
import homeassistant.util.dt as dt_util
class TestUtil(unittest.TestCase): class TestUtil(unittest.TestCase):
@ -169,21 +170,19 @@ class TestUtil(unittest.TestCase):
def test_throttle(self): def test_throttle(self):
""" Test the add cooldown decorator. """ """ Test the add cooldown decorator. """
calls1 = [] calls1 = []
calls2 = []
@util.Throttle(timedelta(milliseconds=500)) @util.Throttle(timedelta(seconds=4))
def test_throttle1(): def test_throttle1():
calls1.append(1) calls1.append(1)
calls2 = [] @util.Throttle(timedelta(seconds=4), timedelta(seconds=2))
@util.Throttle(
timedelta(milliseconds=500), timedelta(milliseconds=250))
def test_throttle2(): def test_throttle2():
calls2.append(1) calls2.append(1)
# Ensure init is ok now = dt_util.utcnow()
self.assertEqual(0, len(calls1)) plus3 = now + timedelta(seconds=3)
self.assertEqual(0, len(calls2)) plus5 = plus3 + timedelta(seconds=2)
# Call first time and ensure methods got called # Call first time and ensure methods got called
test_throttle1() test_throttle1()
@ -206,25 +205,16 @@ class TestUtil(unittest.TestCase):
self.assertEqual(2, len(calls1)) self.assertEqual(2, len(calls1))
self.assertEqual(1, len(calls2)) self.assertEqual(1, len(calls2))
# Sleep past the no throttle interval for throttle2 with patch('homeassistant.util.utcnow', return_value=plus3):
time.sleep(.3) test_throttle1()
test_throttle2()
test_throttle1()
test_throttle2()
self.assertEqual(2, len(calls1)) self.assertEqual(2, len(calls1))
self.assertEqual(1, len(calls2)) self.assertEqual(1, len(calls2))
test_throttle1(no_throttle=True) with patch('homeassistant.util.utcnow', return_value=plus5):
test_throttle2(no_throttle=True) test_throttle1()
test_throttle2()
self.assertEqual(3, len(calls1)) self.assertEqual(3, len(calls1))
self.assertEqual(2, len(calls2)) self.assertEqual(2, len(calls2))
time.sleep(.5)
test_throttle1()
test_throttle2()
self.assertEqual(4, len(calls1))
self.assertEqual(3, len(calls2))