mirror of
https://github.com/home-assistant/core.git
synced 2025-04-24 01:08:12 +00:00
Device tracker component & platform validation. No more home_range. (#2908)
* Device tracker component & platform validation. No more home_range. * Mock, bluetooth * Renamed _CONFIG_SCHEMA. Raise warning for #1606 * test duplicates * Fix assert * Coverage * Typing * T fixes
This commit is contained in:
parent
16e0187fcc
commit
55d305359e
@ -10,14 +10,19 @@ from datetime import timedelta
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from typing import Any, Sequence, Callable
|
||||
|
||||
from homeassistant.bootstrap import prepare_setup_platform
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.bootstrap import (
|
||||
prepare_setup_platform, log_exception)
|
||||
from homeassistant.components import group, zone
|
||||
from homeassistant.components.discovery import SERVICE_NETGEAR
|
||||
from homeassistant.config import load_yaml_config_file
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import config_per_platform, discovery
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.helpers.typing import GPSType, ConfigType, HomeAssistantType
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
import homeassistant.util as util
|
||||
import homeassistant.util.dt as dt_util
|
||||
@ -27,8 +32,7 @@ from homeassistant.const import (
|
||||
ATTR_GPS_ACCURACY, ATTR_LATITUDE, ATTR_LONGITUDE,
|
||||
DEVICE_DEFAULT_NAME, STATE_HOME, STATE_NOT_HOME)
|
||||
|
||||
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA
|
||||
DOMAIN = "device_tracker"
|
||||
DOMAIN = 'device_tracker'
|
||||
DEPENDENCIES = ['zone']
|
||||
|
||||
GROUP_NAME_ALL_DEVICES = 'all devices'
|
||||
@ -38,21 +42,18 @@ ENTITY_ID_FORMAT = DOMAIN + '.{}'
|
||||
|
||||
YAML_DEVICES = 'known_devices.yaml'
|
||||
|
||||
CONF_TRACK_NEW = "track_new_devices"
|
||||
DEFAULT_CONF_TRACK_NEW = True
|
||||
CONF_TRACK_NEW = 'track_new_devices'
|
||||
DEFAULT_TRACK_NEW = True
|
||||
|
||||
CONF_CONSIDER_HOME = 'consider_home'
|
||||
DEFAULT_CONSIDER_HOME = 180 # seconds
|
||||
|
||||
CONF_SCAN_INTERVAL = "interval_seconds"
|
||||
CONF_SCAN_INTERVAL = 'interval_seconds'
|
||||
DEFAULT_SCAN_INTERVAL = 12
|
||||
|
||||
CONF_AWAY_HIDE = 'hide_if_away'
|
||||
DEFAULT_AWAY_HIDE = False
|
||||
|
||||
CONF_HOME_RANGE = 'home_range'
|
||||
DEFAULT_HOME_RANGE = 100
|
||||
|
||||
SERVICE_SEE = 'see'
|
||||
|
||||
ATTR_MAC = 'mac'
|
||||
@ -62,23 +63,33 @@ ATTR_LOCATION_NAME = 'location_name'
|
||||
ATTR_GPS = 'gps'
|
||||
ATTR_BATTERY = 'battery'
|
||||
|
||||
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend({
|
||||
vol.Optional(CONF_SCAN_INTERVAL): cv.positive_int, # seconds
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
_CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.All(cv.ensure_list, [
|
||||
vol.Schema({
|
||||
vol.Optional(CONF_TRACK_NEW): cv.boolean,
|
||||
vol.Optional(CONF_CONSIDER_HOME): cv.positive_int # seconds
|
||||
}, extra=vol.ALLOW_EXTRA)])}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
DISCOVERY_PLATFORMS = {
|
||||
SERVICE_NETGEAR: 'netgear',
|
||||
}
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
|
||||
|
||||
def is_on(hass, entity_id=None):
|
||||
def is_on(hass: HomeAssistantType, entity_id: str=None):
|
||||
"""Return the state if any or a specified device is home."""
|
||||
entity = entity_id or ENTITY_ID_ALL_DEVICES
|
||||
|
||||
return hass.states.is_state(entity, STATE_HOME)
|
||||
|
||||
|
||||
def see(hass, mac=None, dev_id=None, host_name=None, location_name=None,
|
||||
gps=None, gps_accuracy=None, battery=None):
|
||||
def see(hass: HomeAssistantType, mac: str=None, dev_id: str=None,
|
||||
host_name: str=None, location_name: str=None,
|
||||
gps: GPSType=None, gps_accuracy=None,
|
||||
battery=None): # pylint: disable=too-many-arguments
|
||||
"""Call service to notify you see device."""
|
||||
data = {key: value for key, value in
|
||||
((ATTR_MAC, mac),
|
||||
@ -91,27 +102,24 @@ def see(hass, mac=None, dev_id=None, host_name=None, location_name=None,
|
||||
hass.services.call(DOMAIN, SERVICE_SEE, data)
|
||||
|
||||
|
||||
def setup(hass, config):
|
||||
def setup(hass: HomeAssistantType, config: ConfigType):
|
||||
"""Setup device tracker."""
|
||||
yaml_path = hass.config.path(YAML_DEVICES)
|
||||
|
||||
conf = config.get(DOMAIN, {})
|
||||
|
||||
# Config can be an empty list. In that case, substitute a dict
|
||||
if isinstance(conf, list):
|
||||
try:
|
||||
conf = _CONFIG_SCHEMA(config).get(DOMAIN, [])
|
||||
except vol.Invalid as ex:
|
||||
log_exception(ex, DOMAIN, config)
|
||||
return False
|
||||
else:
|
||||
conf = conf[0] if len(conf) > 0 else {}
|
||||
consider_home = timedelta(
|
||||
seconds=conf.get(CONF_CONSIDER_HOME, DEFAULT_CONSIDER_HOME))
|
||||
track_new = conf.get(CONF_TRACK_NEW, DEFAULT_TRACK_NEW)
|
||||
|
||||
consider_home = timedelta(
|
||||
seconds=util.convert(conf.get(CONF_CONSIDER_HOME), int,
|
||||
DEFAULT_CONSIDER_HOME))
|
||||
track_new = util.convert(conf.get(CONF_TRACK_NEW), bool,
|
||||
DEFAULT_CONF_TRACK_NEW)
|
||||
home_range = util.convert(conf.get(CONF_HOME_RANGE), int,
|
||||
DEFAULT_HOME_RANGE)
|
||||
devices = load_config(yaml_path, hass, consider_home)
|
||||
|
||||
devices = load_config(yaml_path, hass, consider_home, home_range)
|
||||
tracker = DeviceTracker(hass, consider_home, track_new, home_range,
|
||||
devices)
|
||||
tracker = DeviceTracker(hass, consider_home, track_new, devices)
|
||||
|
||||
def setup_platform(p_type, p_config, disc_info=None):
|
||||
"""Setup a device tracker platform."""
|
||||
@ -170,30 +178,37 @@ def setup(hass, config):
|
||||
class DeviceTracker(object):
|
||||
"""Representation of a device tracker."""
|
||||
|
||||
def __init__(self, hass, consider_home, track_new, home_range, devices):
|
||||
def __init__(self, hass: HomeAssistantType, consider_home: timedelta,
|
||||
track_new: bool, devices: Sequence) -> None:
|
||||
"""Initialize a device tracker."""
|
||||
self.hass = hass
|
||||
self.devices = {dev.dev_id: dev for dev in devices}
|
||||
self.mac_to_dev = {dev.mac: dev for dev in devices if dev.mac}
|
||||
for dev in devices:
|
||||
if self.devices[dev.dev_id] is not dev:
|
||||
_LOGGER.warning('Duplicate device IDs detected %s', dev.dev_id)
|
||||
if dev.mac and self.mac_to_dev[dev.mac] is not dev:
|
||||
_LOGGER.warning('Duplicate device MAC addresses detected %s',
|
||||
dev.mac)
|
||||
self.consider_home = consider_home
|
||||
self.track_new = track_new
|
||||
self.home_range = home_range
|
||||
self.lock = threading.Lock()
|
||||
|
||||
for device in devices:
|
||||
if device.track:
|
||||
device.update_ha_state()
|
||||
|
||||
self.group = None
|
||||
self.group = None # type: group.Group
|
||||
|
||||
def see(self, mac=None, dev_id=None, host_name=None, location_name=None,
|
||||
gps=None, gps_accuracy=None, battery=None):
|
||||
def see(self, mac: str=None, dev_id: str=None, host_name: str=None,
|
||||
location_name: str=None, gps: GPSType=None, gps_accuracy=None,
|
||||
battery: str=None):
|
||||
"""Notify the device tracker that you see a device."""
|
||||
with self.lock:
|
||||
if mac is None and dev_id is None:
|
||||
raise HomeAssistantError('Neither mac or device id passed in')
|
||||
elif mac is not None:
|
||||
mac = mac.upper()
|
||||
mac = str(mac).upper()
|
||||
device = self.mac_to_dev.get(mac)
|
||||
if not device:
|
||||
dev_id = util.slugify(host_name or '') or util.slugify(mac)
|
||||
@ -211,7 +226,7 @@ class DeviceTracker(object):
|
||||
# If no device can be found, create it
|
||||
dev_id = util.ensure_unique_string(dev_id, self.devices.keys())
|
||||
device = Device(
|
||||
self.hass, self.consider_home, self.home_range, self.track_new,
|
||||
self.hass, self.consider_home, self.track_new,
|
||||
dev_id, mac, (host_name or dev_id).replace('_', ' '))
|
||||
self.devices[dev_id] = device
|
||||
if mac is not None:
|
||||
@ -234,7 +249,7 @@ class DeviceTracker(object):
|
||||
self.group = group.Group(
|
||||
self.hass, GROUP_NAME_ALL_DEVICES, entity_ids, False)
|
||||
|
||||
def update_stale(self, now):
|
||||
def update_stale(self, now: dt_util.dt.datetime):
|
||||
"""Update stale devices."""
|
||||
with self.lock:
|
||||
for device in self.devices.values():
|
||||
@ -246,19 +261,21 @@ class DeviceTracker(object):
|
||||
class Device(Entity):
|
||||
"""Represent a tracked device."""
|
||||
|
||||
host_name = None
|
||||
location_name = None
|
||||
gps = None
|
||||
host_name = None # type: str
|
||||
location_name = None # type: str
|
||||
gps = None # type: GPSType
|
||||
gps_accuracy = 0
|
||||
last_seen = None
|
||||
battery = None
|
||||
last_seen = None # type: dt_util.dt.datetime
|
||||
battery = None # type: str
|
||||
|
||||
# Track if the last update of this device was HOME.
|
||||
last_update_home = False
|
||||
_state = STATE_NOT_HOME
|
||||
|
||||
def __init__(self, hass, consider_home, home_range, track, dev_id, mac,
|
||||
name=None, picture=None, gravatar=None, away_hide=False):
|
||||
def __init__(self, hass: HomeAssistantType, consider_home: timedelta,
|
||||
track: bool, dev_id: str, mac: str, name: str=None,
|
||||
picture: str=None, gravatar: str=None,
|
||||
away_hide: bool=False) -> None:
|
||||
"""Initialize a device."""
|
||||
self.hass = hass
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(dev_id)
|
||||
@ -267,8 +284,6 @@ class Device(Entity):
|
||||
# detected anymore.
|
||||
self.consider_home = consider_home
|
||||
|
||||
# Distance in meters
|
||||
self.home_range = home_range
|
||||
# Device ID
|
||||
self.dev_id = dev_id
|
||||
self.mac = mac
|
||||
@ -287,13 +302,6 @@ class Device(Entity):
|
||||
|
||||
self.away_hide = away_hide
|
||||
|
||||
@property
|
||||
def gps_home(self):
|
||||
"""Return if device is within range of home."""
|
||||
distance = max(
|
||||
0, self.hass.config.distance(*self.gps) - self.gps_accuracy)
|
||||
return self.gps is not None and distance <= self.home_range
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the entity."""
|
||||
@ -329,26 +337,24 @@ class Device(Entity):
|
||||
"""If device should be hidden."""
|
||||
return self.away_hide and self.state != STATE_HOME
|
||||
|
||||
def seen(self, host_name=None, location_name=None, gps=None,
|
||||
gps_accuracy=0, battery=None):
|
||||
def seen(self, host_name: str=None, location_name: str=None,
|
||||
gps: GPSType=None, gps_accuracy=0, battery: str=None):
|
||||
"""Mark the device as seen."""
|
||||
self.last_seen = dt_util.utcnow()
|
||||
self.host_name = host_name
|
||||
self.location_name = location_name
|
||||
self.gps_accuracy = gps_accuracy or 0
|
||||
self.battery = battery
|
||||
if gps is None:
|
||||
self.gps = None
|
||||
else:
|
||||
self.gps = None
|
||||
if gps is not None:
|
||||
try:
|
||||
self.gps = tuple(float(val) for val in gps)
|
||||
except ValueError:
|
||||
self.gps = float(gps[0]), float(gps[1])
|
||||
except (ValueError, TypeError, IndexError):
|
||||
_LOGGER.warning('Could not parse gps value for %s: %s',
|
||||
self.dev_id, gps)
|
||||
self.gps = None
|
||||
self.update()
|
||||
|
||||
def stale(self, now=None):
|
||||
def stale(self, now: dt_util.dt.datetime=None):
|
||||
"""Return if device state is stale."""
|
||||
return self.last_seen and \
|
||||
(now or dt_util.utcnow()) - self.last_seen > self.consider_home
|
||||
@ -377,32 +383,30 @@ class Device(Entity):
|
||||
self.last_update_home = True
|
||||
|
||||
|
||||
def load_config(path, hass, consider_home, home_range):
|
||||
def load_config(path: str, hass: HomeAssistantType, consider_home: timedelta):
|
||||
"""Load devices from YAML configuration file."""
|
||||
if not os.path.isfile(path):
|
||||
return []
|
||||
try:
|
||||
return [
|
||||
Device(hass, consider_home, home_range, device.get('track', False),
|
||||
Device(hass, consider_home, device.get('track', False),
|
||||
str(dev_id).lower(), str(device.get('mac')).upper(),
|
||||
device.get('name'), device.get('picture'),
|
||||
device.get('gravatar'),
|
||||
device.get(CONF_AWAY_HIDE, DEFAULT_AWAY_HIDE))
|
||||
for dev_id, device in load_yaml_config_file(path).items()]
|
||||
except HomeAssistantError:
|
||||
except (HomeAssistantError, FileNotFoundError):
|
||||
# When YAML file could not be loaded/did not contain a dict
|
||||
return []
|
||||
|
||||
|
||||
def setup_scanner_platform(hass, config, scanner, see_device):
|
||||
def setup_scanner_platform(hass: HomeAssistantType, config: ConfigType,
|
||||
scanner: Any, see_device: Callable):
|
||||
"""Helper method to connect scanner-based platform to device tracker."""
|
||||
interval = util.convert(config.get(CONF_SCAN_INTERVAL), int,
|
||||
DEFAULT_SCAN_INTERVAL)
|
||||
interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
|
||||
|
||||
# Initial scan of each mac we also tell about host name for config
|
||||
seen = set()
|
||||
seen = set() # type: Any
|
||||
|
||||
def device_tracker_scan(now):
|
||||
def device_tracker_scan(now: dt_util.dt.datetime):
|
||||
"""Called when interval matches."""
|
||||
for mac in scanner.scan_devices():
|
||||
if mac in seen:
|
||||
@ -418,7 +422,7 @@ def setup_scanner_platform(hass, config, scanner, see_device):
|
||||
device_tracker_scan(None)
|
||||
|
||||
|
||||
def update_config(path, dev_id, device):
|
||||
def update_config(path: str, dev_id: str, device: Device):
|
||||
"""Add device to YAML configuration file."""
|
||||
with open(path, 'a') as out:
|
||||
out.write('\n')
|
||||
@ -432,8 +436,8 @@ def update_config(path, dev_id, device):
|
||||
out.write(' {}: {}\n'.format(key, '' if value is None else value))
|
||||
|
||||
|
||||
def get_gravatar_for_email(email):
|
||||
def get_gravatar_for_email(email: str):
|
||||
"""Return an 80px Gravatar for the given email address."""
|
||||
import hashlib
|
||||
url = "https://www.gravatar.com/avatar/{}.jpg?s=80&d=wavatar"
|
||||
url = 'https://www.gravatar.com/avatar/{}.jpg?s=80&d=wavatar'
|
||||
return url.format(hashlib.md5(email.encode('utf-8').lower()).hexdigest())
|
||||
|
@ -63,7 +63,7 @@ def setup_scanner(hass, config, see):
|
||||
# Load all known devices.
|
||||
# We just need the devices so set consider_home and home range
|
||||
# to 0
|
||||
for device in load_config(yaml_path, hass, 0, 0):
|
||||
for device in load_config(yaml_path, hass, 0):
|
||||
# check if device is a valid bluetooth device
|
||||
if device.mac and device.mac[:3].upper() == BLE_PREFIX:
|
||||
if device.track:
|
||||
|
@ -45,7 +45,7 @@ def setup_scanner(hass, config, see):
|
||||
# Load all known devices.
|
||||
# We just need the devices so set consider_home and home range
|
||||
# to 0
|
||||
for device in load_config(yaml_path, hass, 0, 0):
|
||||
for device in load_config(yaml_path, hass, 0):
|
||||
# check if device is a valid bluetooth device
|
||||
if device.mac and device.mac[:3].upper() == BT_PREFIX:
|
||||
if device.track:
|
||||
|
@ -1,24 +1,13 @@
|
||||
"""Typing Helpers for Home-Assistant."""
|
||||
from typing import Dict, Any
|
||||
from typing import Dict, Any, Tuple
|
||||
|
||||
# NOTE: NewType added to typing in 3.5.2 in June, 2016; Since 3.5.2 includes
|
||||
# security fixes everyone on 3.5 should upgrade "soon"
|
||||
try:
|
||||
from typing import NewType
|
||||
except ImportError:
|
||||
NewType = None
|
||||
import homeassistant.core
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
if NewType:
|
||||
ConfigType = NewType('ConfigType', Dict[str, Any])
|
||||
|
||||
# Custom type for recorder Queries
|
||||
QueryType = NewType('QueryType', Any)
|
||||
GPSType = Tuple[float, float]
|
||||
ConfigType = Dict[str, Any]
|
||||
HomeAssistantType = homeassistant.core.HomeAssistant
|
||||
|
||||
# Duplicates for 3.5.1
|
||||
# pylint: disable=invalid-name
|
||||
else:
|
||||
ConfigType = Dict[str, Any] # type: ignore
|
||||
|
||||
# Custom type for recorder Queries
|
||||
QueryType = Any # type: ignore
|
||||
# Custom type for recorder Queries
|
||||
QueryType = Any
|
||||
|
@ -12,7 +12,7 @@ import string
|
||||
from functools import wraps
|
||||
from types import MappingProxyType
|
||||
|
||||
from typing import Any, Optional, TypeVar, Callable, Sequence
|
||||
from typing import Any, Optional, TypeVar, Callable, Sequence, KeysView, Union
|
||||
|
||||
from .dt import as_local, utcnow
|
||||
|
||||
@ -63,8 +63,8 @@ def convert(value: T, to_type: Callable[[T], U],
|
||||
return default
|
||||
|
||||
|
||||
def ensure_unique_string(preferred_string: str,
|
||||
current_strings: Sequence[str]) -> str:
|
||||
def ensure_unique_string(preferred_string: str, current_strings:
|
||||
Union[Sequence[str], KeysView[str]]) -> str:
|
||||
"""Return a string that is not present in current_strings.
|
||||
|
||||
If preferred string exists will append _2, _3, ..
|
||||
|
@ -7,7 +7,7 @@ from io import StringIO
|
||||
import logging
|
||||
|
||||
from homeassistant import core as ha, loader
|
||||
from homeassistant.bootstrap import _setup_component
|
||||
from homeassistant.bootstrap import setup_component
|
||||
from homeassistant.helpers.entity import ToggleEntity
|
||||
from homeassistant.util.unit_system import METRIC_SYSTEM
|
||||
import homeassistant.util.dt as date_util
|
||||
@ -137,15 +137,15 @@ def mock_http_component(hass):
|
||||
hass.config.components.append('http')
|
||||
|
||||
|
||||
@mock.patch('homeassistant.components.mqtt.MQTT')
|
||||
def mock_mqtt_component(hass, mock_mqtt):
|
||||
def mock_mqtt_component(hass):
|
||||
"""Mock the MQTT component."""
|
||||
_setup_component(hass, mqtt.DOMAIN, {
|
||||
mqtt.DOMAIN: {
|
||||
mqtt.CONF_BROKER: 'mock-broker',
|
||||
}
|
||||
})
|
||||
return mock_mqtt
|
||||
with mock.patch('homeassistant.components.mqtt.MQTT') as mock_mqtt:
|
||||
setup_component(hass, mqtt.DOMAIN, {
|
||||
mqtt.DOMAIN: {
|
||||
mqtt.CONF_BROKER: 'mock-broker',
|
||||
}
|
||||
})
|
||||
return mock_mqtt
|
||||
|
||||
|
||||
class MockModule(object):
|
||||
|
@ -1,10 +1,10 @@
|
||||
"""The tests for the device tracker component."""
|
||||
# pylint: disable=protected-access,too-many-public-methods
|
||||
import logging
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from homeassistant.loader import get_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
@ -12,13 +12,21 @@ from homeassistant.const import (
|
||||
ATTR_ENTITY_ID, ATTR_ENTITY_PICTURE, ATTR_FRIENDLY_NAME, ATTR_HIDDEN,
|
||||
STATE_HOME, STATE_NOT_HOME, CONF_PLATFORM)
|
||||
import homeassistant.components.device_tracker as device_tracker
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
from tests.common import (
|
||||
get_test_home_assistant, fire_time_changed, fire_service_discovered)
|
||||
get_test_home_assistant, fire_time_changed, fire_service_discovered,
|
||||
patch_yaml_files)
|
||||
|
||||
TEST_PLATFORM = {device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
"""Test the Device tracker."""
|
||||
hass = None # HomeAssistant
|
||||
yaml_devices = None # type: str
|
||||
|
||||
def setUp(self): # pylint: disable=invalid-name
|
||||
"""Setup things to be run when tests are started."""
|
||||
@ -48,27 +56,28 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
|
||||
def test_reading_broken_yaml_config(self): # pylint: disable=no-self-use
|
||||
"""Test when known devices contains invalid data."""
|
||||
with tempfile.NamedTemporaryFile() as fpt:
|
||||
# file is empty
|
||||
assert device_tracker.load_config(fpt.name, None, False, 0) == []
|
||||
|
||||
fpt.write('100'.encode('utf-8'))
|
||||
fpt.flush()
|
||||
|
||||
# file contains a non-dict format
|
||||
assert device_tracker.load_config(fpt.name, None, False, 0) == []
|
||||
files = {'empty.yaml': '',
|
||||
'bad.yaml': '100',
|
||||
'ok.yaml': 'my_device:\n name: Device'}
|
||||
with patch_yaml_files(files):
|
||||
# File is empty
|
||||
assert device_tracker.load_config('empty.yaml', None, False) == []
|
||||
# File contains a non-dict format
|
||||
assert device_tracker.load_config('bad.yaml', None, False) == []
|
||||
# A file that works fine
|
||||
assert len(device_tracker.load_config('ok.yaml', None, False)) == 1
|
||||
|
||||
def test_reading_yaml_config(self):
|
||||
"""Test the rendering of the YAML configuration."""
|
||||
dev_id = 'test'
|
||||
device = device_tracker.Device(
|
||||
self.hass, timedelta(seconds=180), 0, True, dev_id,
|
||||
self.hass, timedelta(seconds=180), True, dev_id,
|
||||
'AB:CD:EF:GH:IJ', 'Test name', picture='http://test.picture',
|
||||
away_hide=True)
|
||||
device_tracker.update_config(self.yaml_devices, dev_id, device)
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
config = device_tracker.load_config(self.yaml_devices, self.hass,
|
||||
device.consider_home, 0)[0]
|
||||
device.consider_home)[0]
|
||||
self.assertEqual(device.dev_id, config.dev_id)
|
||||
self.assertEqual(device.track, config.track)
|
||||
self.assertEqual(device.mac, config.mac)
|
||||
@ -76,12 +85,45 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
self.assertEqual(device.away_hide, config.away_hide)
|
||||
self.assertEqual(device.consider_home, config.consider_home)
|
||||
|
||||
@patch('homeassistant.components.device_tracker._LOGGER.warning')
|
||||
def test_track_with_duplicate_mac_dev_id(self, mock_warning): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test adding duplicate MACs or device IDs to DeviceTracker."""
|
||||
|
||||
devices = [
|
||||
device_tracker.Device(self.hass, True, True, 'my_device', 'AB:01',
|
||||
'My device', None, None, False),
|
||||
device_tracker.Device(self.hass, True, True, 'your_device',
|
||||
'AB:01', 'Your device', None, None, False)]
|
||||
device_tracker.DeviceTracker(self.hass, False, True, devices)
|
||||
_LOGGER.debug(mock_warning.call_args_list)
|
||||
assert mock_warning.call_count == 1, \
|
||||
"The only warning call should be duplicates (check DEBUG)"
|
||||
args, _ = mock_warning.call_args
|
||||
assert 'Duplicate device MAC' in args[0], \
|
||||
'Duplicate MAC warning expected'
|
||||
|
||||
mock_warning.reset_mock()
|
||||
devices = [
|
||||
device_tracker.Device(self.hass, True, True, 'my_device',
|
||||
'AB:01', 'My device', None, None, False),
|
||||
device_tracker.Device(self.hass, True, True, 'my_device',
|
||||
None, 'Your device', None, None, False)]
|
||||
device_tracker.DeviceTracker(self.hass, False, True, devices)
|
||||
|
||||
_LOGGER.debug(mock_warning.call_args_list)
|
||||
assert mock_warning.call_count == 1, \
|
||||
"The only warning call should be duplicates (check DEBUG)"
|
||||
args, _ = mock_warning.call_args
|
||||
assert 'Duplicate device IDs' in args[0], \
|
||||
'Duplicate device IDs warning expected'
|
||||
|
||||
def test_setup_without_yaml_file(self):
|
||||
"""Test with no YAML file."""
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
def test_adding_unknown_device_to_config(self):
|
||||
def test_adding_unknown_device_to_config(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test the adding of unknown devices to configuration file."""
|
||||
scanner = get_component('device_tracker.test').SCANNER
|
||||
scanner.reset()
|
||||
@ -90,7 +132,7 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
self.assertTrue(device_tracker.setup(self.hass, {
|
||||
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
|
||||
config = device_tracker.load_config(self.yaml_devices, self.hass,
|
||||
timedelta(seconds=0), 0)
|
||||
timedelta(seconds=0))
|
||||
assert len(config) == 1
|
||||
assert config[0].dev_id == 'dev1'
|
||||
assert config[0].track
|
||||
@ -99,7 +141,7 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
"""Test the Gravatar generation."""
|
||||
dev_id = 'test'
|
||||
device = device_tracker.Device(
|
||||
self.hass, timedelta(seconds=180), 0, True, dev_id,
|
||||
self.hass, timedelta(seconds=180), True, dev_id,
|
||||
'AB:CD:EF:GH:IJ', 'Test name', gravatar='test@example.com')
|
||||
gravatar_url = ("https://www.gravatar.com/avatar/"
|
||||
"55502f40dc8b7c769880b10874abc9d0.jpg?s=80&d=wavatar")
|
||||
@ -109,7 +151,7 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
"""Test that Gravatar overrides picture."""
|
||||
dev_id = 'test'
|
||||
device = device_tracker.Device(
|
||||
self.hass, timedelta(seconds=180), 0, True, dev_id,
|
||||
self.hass, timedelta(seconds=180), True, dev_id,
|
||||
'AB:CD:EF:GH:IJ', 'Test name', picture='http://test.picture',
|
||||
gravatar='test@example.com')
|
||||
gravatar_url = ("https://www.gravatar.com/avatar/"
|
||||
@ -122,8 +164,8 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
|
||||
with patch.dict(device_tracker.DISCOVERY_PLATFORMS, {'test': 'test'}):
|
||||
with patch.object(scanner, 'scan_devices') as mock_scan:
|
||||
self.assertTrue(device_tracker.setup(self.hass, {
|
||||
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
|
||||
self.assertTrue(device_tracker.setup(self.hass,
|
||||
TEST_PLATFORM))
|
||||
fire_service_discovered(self.hass, 'test', {})
|
||||
self.assertTrue(mock_scan.called)
|
||||
|
||||
@ -139,9 +181,9 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
with patch('homeassistant.components.device_tracker.dt_util.utcnow',
|
||||
return_value=register_time):
|
||||
self.assertTrue(device_tracker.setup(self.hass, {
|
||||
'device_tracker': {
|
||||
'platform': 'test',
|
||||
'consider_home': 59,
|
||||
device_tracker.DOMAIN: {
|
||||
CONF_PLATFORM: 'test',
|
||||
device_tracker.CONF_CONSIDER_HOME: 59,
|
||||
}}))
|
||||
|
||||
self.assertEqual(STATE_HOME,
|
||||
@ -165,11 +207,11 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
picture = 'http://placehold.it/200x200'
|
||||
|
||||
device = device_tracker.Device(
|
||||
self.hass, timedelta(seconds=180), 0, True, dev_id, None,
|
||||
self.hass, timedelta(seconds=180), True, dev_id, None,
|
||||
friendly_name, picture, away_hide=True)
|
||||
device_tracker.update_config(self.yaml_devices, dev_id, device)
|
||||
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
|
||||
attrs = self.hass.states.get(entity_id).attributes
|
||||
|
||||
@ -181,15 +223,14 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
dev_id = 'test_entity'
|
||||
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
|
||||
device = device_tracker.Device(
|
||||
self.hass, timedelta(seconds=180), 0, True, dev_id, None,
|
||||
self.hass, timedelta(seconds=180), True, dev_id, None,
|
||||
away_hide=True)
|
||||
device_tracker.update_config(self.yaml_devices, dev_id, device)
|
||||
|
||||
scanner = get_component('device_tracker.test').SCANNER
|
||||
scanner.reset()
|
||||
|
||||
self.assertTrue(device_tracker.setup(self.hass, {
|
||||
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
|
||||
self.assertTrue(self.hass.states.get(entity_id)
|
||||
.attributes.get(ATTR_HIDDEN))
|
||||
@ -199,15 +240,14 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
dev_id = 'test_entity'
|
||||
entity_id = device_tracker.ENTITY_ID_FORMAT.format(dev_id)
|
||||
device = device_tracker.Device(
|
||||
self.hass, timedelta(seconds=180), 0, True, dev_id, None,
|
||||
self.hass, timedelta(seconds=180), True, dev_id, None,
|
||||
away_hide=True)
|
||||
device_tracker.update_config(self.yaml_devices, dev_id, device)
|
||||
|
||||
scanner = get_component('device_tracker.test').SCANNER
|
||||
scanner.reset()
|
||||
|
||||
self.assertTrue(device_tracker.setup(self.hass, {
|
||||
device_tracker.DOMAIN: {CONF_PLATFORM: 'test'}}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
|
||||
state = self.hass.states.get(device_tracker.ENTITY_ID_ALL_DEVICES)
|
||||
self.assertIsNotNone(state)
|
||||
@ -217,40 +257,31 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
|
||||
@patch('homeassistant.components.device_tracker.DeviceTracker.see')
|
||||
def test_see_service(self, mock_see):
|
||||
"""Test the see service."""
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
mac = 'AB:CD:EF:GH'
|
||||
dev_id = 'some_device'
|
||||
host_name = 'example.com'
|
||||
location_name = 'Work'
|
||||
gps = [.3, .8]
|
||||
|
||||
device_tracker.see(self.hass, mac, dev_id, host_name, location_name,
|
||||
gps)
|
||||
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
mock_see.assert_called_once_with(
|
||||
mac=mac, dev_id=dev_id, host_name=host_name,
|
||||
location_name=location_name, gps=gps)
|
||||
|
||||
@patch('homeassistant.components.device_tracker.DeviceTracker.see')
|
||||
def test_see_service_unicode_dev_id(self, mock_see):
|
||||
"""Test the see service with a unicode dev_id and NO MAC."""
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
params = {
|
||||
'dev_id': chr(233), # e' acute accent from icloud
|
||||
'dev_id': 'some_device',
|
||||
'host_name': 'example.com',
|
||||
'location_name': 'Work',
|
||||
'gps': [.3, .8]
|
||||
}
|
||||
device_tracker.see(self.hass, **params)
|
||||
self.hass.pool.block_till_done()
|
||||
assert mock_see.call_count == 1
|
||||
mock_see.assert_called_once_with(**params)
|
||||
|
||||
def test_not_write_duplicate_yaml_keys(self):
|
||||
mock_see.reset_mock()
|
||||
params['dev_id'] += chr(233) # e' acute accent from icloud
|
||||
|
||||
device_tracker.see(self.hass, **params)
|
||||
self.hass.pool.block_till_done()
|
||||
assert mock_see.call_count == 1
|
||||
mock_see.assert_called_once_with(**params)
|
||||
|
||||
def test_not_write_duplicate_yaml_keys(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test that the device tracker will not generate invalid YAML."""
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
|
||||
device_tracker.see(self.hass, 'mac_1', host_name='hello')
|
||||
device_tracker.see(self.hass, 'mac_2', host_name='hello')
|
||||
@ -258,15 +289,46 @@ class TestComponentsDeviceTracker(unittest.TestCase):
|
||||
self.hass.pool.block_till_done()
|
||||
|
||||
config = device_tracker.load_config(self.yaml_devices, self.hass,
|
||||
timedelta(seconds=0), 0)
|
||||
timedelta(seconds=0))
|
||||
assert len(config) == 2
|
||||
|
||||
def test_not_allow_invalid_dev_id(self):
|
||||
def test_not_allow_invalid_dev_id(self): # pylint: disable=invalid-name
|
||||
"""Test that the device tracker will not allow invalid dev ids."""
|
||||
self.assertTrue(device_tracker.setup(self.hass, {}))
|
||||
self.assertTrue(device_tracker.setup(self.hass, TEST_PLATFORM))
|
||||
|
||||
device_tracker.see(self.hass, dev_id='hello-world')
|
||||
|
||||
config = device_tracker.load_config(self.yaml_devices, self.hass,
|
||||
timedelta(seconds=0), 0)
|
||||
timedelta(seconds=0))
|
||||
assert len(config) == 0
|
||||
|
||||
@patch('homeassistant.components.device_tracker._LOGGER.warning')
|
||||
def test_see_failures(self, mock_warning):
|
||||
"""Test that the device tracker see failures."""
|
||||
tracker = device_tracker.DeviceTracker(
|
||||
self.hass, timedelta(seconds=60), 0, [])
|
||||
|
||||
# MAC is not a string (but added)
|
||||
tracker.see(mac=567, host_name="Number MAC")
|
||||
|
||||
# No device id or MAC(not added)
|
||||
with self.assertRaises(HomeAssistantError):
|
||||
tracker.see()
|
||||
assert mock_warning.call_count == 0
|
||||
|
||||
# Ignore gps on invalid GPS (both added & warnings)
|
||||
tracker.see(mac='mac_1_bad_gps', gps=1)
|
||||
tracker.see(mac='mac_2_bad_gps', gps=[1])
|
||||
tracker.see(mac='mac_3_bad_gps', gps='gps')
|
||||
config = device_tracker.load_config(self.yaml_devices, self.hass,
|
||||
timedelta(seconds=0))
|
||||
assert mock_warning.call_count == 3
|
||||
|
||||
assert len(config) == 4
|
||||
|
||||
@patch('homeassistant.components.device_tracker.log_exception')
|
||||
def test_config_failure(self, mock_ex):
|
||||
"""Test that the device tracker see failures."""
|
||||
device_tracker.setup(self.hass, {device_tracker.DOMAIN: {
|
||||
device_tracker.CONF_CONSIDER_HOME: -1}})
|
||||
assert mock_ex.call_count == 1
|
||||
|
@ -8,17 +8,19 @@ import requests
|
||||
from homeassistant import bootstrap, const
|
||||
import homeassistant.components.device_tracker as device_tracker
|
||||
import homeassistant.components.http as http
|
||||
from homeassistant.const import CONF_PLATFORM
|
||||
|
||||
from tests.common import get_test_home_assistant, get_test_instance_port
|
||||
|
||||
SERVER_PORT = get_test_instance_port()
|
||||
HTTP_BASE_URL = "http://127.0.0.1:{}".format(SERVER_PORT)
|
||||
|
||||
hass = None
|
||||
hass = None # pylint: disable=invalid-name
|
||||
|
||||
|
||||
def _url(data={}):
|
||||
def _url(data=None):
|
||||
"""Helper method to generate URLs."""
|
||||
data = data or {}
|
||||
data = "&".join(["{}={}".format(name, value) for
|
||||
name, value in data.items()])
|
||||
return "{}{}locative?{}".format(HTTP_BASE_URL, const.URL_API, data)
|
||||
@ -26,7 +28,7 @@ def _url(data={}):
|
||||
|
||||
def setUpModule(): # pylint: disable=invalid-name
|
||||
"""Initalize a Home Assistant server."""
|
||||
global hass
|
||||
global hass # pylint: disable=invalid-name
|
||||
|
||||
hass = get_test_home_assistant()
|
||||
bootstrap.setup_component(hass, http.DOMAIN, {
|
||||
@ -38,7 +40,7 @@ def setUpModule(): # pylint: disable=invalid-name
|
||||
# Set up device tracker
|
||||
bootstrap.setup_component(hass, device_tracker.DOMAIN, {
|
||||
device_tracker.DOMAIN: {
|
||||
'platform': 'locative'
|
||||
CONF_PLATFORM: 'locative'
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
"""The tests for the MQTT device tracker platform."""
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
import logging
|
||||
import os
|
||||
|
||||
from homeassistant.bootstrap import _setup_component
|
||||
@ -9,6 +11,8 @@ from homeassistant.const import CONF_PLATFORM
|
||||
from tests.common import (
|
||||
get_test_home_assistant, mock_mqtt_component, fire_mqtt_message)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestComponentsDeviceTrackerMQTT(unittest.TestCase):
|
||||
"""Test MQTT device tracker platform."""
|
||||
@ -25,6 +29,27 @@ class TestComponentsDeviceTrackerMQTT(unittest.TestCase):
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def test_ensure_device_tracker_platform_validation(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test if platform validation was done."""
|
||||
def mock_setup_scanner(hass, config, see):
|
||||
"""Check that Qos was added by validation."""
|
||||
self.assertTrue('qos' in config)
|
||||
|
||||
with patch('homeassistant.components.device_tracker.mqtt.'
|
||||
'setup_scanner', side_effect=mock_setup_scanner) as mock_sp:
|
||||
|
||||
dev_id = 'paulus'
|
||||
topic = '/location/paulus'
|
||||
self.hass.config.components = ['mqtt', 'zone']
|
||||
assert _setup_component(self.hass, device_tracker.DOMAIN, {
|
||||
device_tracker.DOMAIN: {
|
||||
CONF_PLATFORM: 'mqtt',
|
||||
'devices': {dev_id: topic}
|
||||
}
|
||||
})
|
||||
assert mock_sp.call_count == 1
|
||||
|
||||
def test_new_message(self):
|
||||
"""Test new message."""
|
||||
dev_id = 'paulus'
|
||||
|
@ -22,12 +22,12 @@ def setUpModule(): # pylint: disable=invalid-name
|
||||
"""Write a device tracker known devices file to be used."""
|
||||
device_tracker.update_config(
|
||||
KNOWN_DEV_YAML_PATH, 'device_1', device_tracker.Device(
|
||||
None, None, None, True, 'device_1', 'DEV1',
|
||||
None, None, True, 'device_1', 'DEV1',
|
||||
picture='http://example.com/dev1.jpg'))
|
||||
|
||||
device_tracker.update_config(
|
||||
KNOWN_DEV_YAML_PATH, 'device_2', device_tracker.Device(
|
||||
None, None, None, True, 'device_2', 'DEV2',
|
||||
None, None, True, 'device_2', 'DEV2',
|
||||
picture='http://example.com/dev2.jpg'))
|
||||
|
||||
|
||||
@ -83,7 +83,8 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
|
||||
|
||||
self.assertTrue(light.is_on(self.hass))
|
||||
|
||||
def test_lights_turn_off_when_everyone_leaves(self):
|
||||
def test_lights_turn_off_when_everyone_leaves(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test lights turn off when everyone leaves the house."""
|
||||
light.turn_on(self.hass)
|
||||
|
||||
@ -99,7 +100,8 @@ class TestDeviceSunLightTrigger(unittest.TestCase):
|
||||
|
||||
self.assertFalse(light.is_on(self.hass))
|
||||
|
||||
def test_lights_turn_on_when_coming_home_after_sun_set(self):
|
||||
def test_lights_turn_on_when_coming_home_after_sun_set(self): \
|
||||
# pylint: disable=invalid-name
|
||||
"""Test lights turn on when coming home after sun set."""
|
||||
light.turn_off(self.hass)
|
||||
ensure_sun_set(self.hass)
|
||||
|
Loading…
x
Reference in New Issue
Block a user