Async version for asuswrt (#17692)

* Testing async data for asuswrt

* Moved to lib
This commit is contained in:
kennedyshead 2018-10-23 11:08:11 +02:00 committed by Paulus Schoutsen
parent 50f0eac7f3
commit 277a9a3995
5 changed files with 26 additions and 794 deletions

View File

@ -58,6 +58,7 @@ homeassistant/components/climate/mill.py @danielhiversen
homeassistant/components/climate/sensibo.py @andrey-git homeassistant/components/climate/sensibo.py @andrey-git
homeassistant/components/cover/group.py @cdce8p homeassistant/components/cover/group.py @cdce8p
homeassistant/components/cover/template.py @PhracturedBlue homeassistant/components/cover/template.py @PhracturedBlue
homeassistant/components/device_tracker/asuswrt.py @kennedyshead
homeassistant/components/device_tracker/automatic.py @armills homeassistant/components/device_tracker/automatic.py @armills
homeassistant/components/device_tracker/huawei_router.py @abmantis homeassistant/components/device_tracker/huawei_router.py @abmantis
homeassistant/components/device_tracker/quantum_gateway.py @cisasteelersfan homeassistant/components/device_tracker/quantum_gateway.py @cisasteelersfan

View File

@ -5,10 +5,6 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.asuswrt/ https://home-assistant.io/components/device_tracker.asuswrt/
""" """
import logging import logging
import re
import socket
import telnetlib
from collections import namedtuple
import voluptuous as vol import voluptuous as vol
@ -19,7 +15,7 @@ from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT, CONF_MODE, CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT, CONF_MODE,
CONF_PROTOCOL) CONF_PROTOCOL)
REQUIREMENTS = ['pexpect==4.6.0'] REQUIREMENTS = ['aioasuswrt==1.0.0']
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -44,345 +40,55 @@ PLATFORM_SCHEMA = vol.All(
})) }))
_LEASES_CMD = 'cat /var/lib/misc/dnsmasq.leases' async def async_get_scanner(hass, config):
_LEASES_REGEX = re.compile(
r'\w+\s' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s' +
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s' +
r'(?P<host>([^\s]+))')
# Command to get both 5GHz and 2.4GHz clients
_WL_CMD = 'for dev in `nvram get wl_ifnames`; do wl -i $dev assoclist; done'
_WL_REGEX = re.compile(
r'\w+\s' +
r'(?P<mac>(([0-9A-F]{2}[:-]){5}([0-9A-F]{2})))')
_IP_NEIGH_CMD = 'ip neigh'
_IP_NEIGH_REGEX = re.compile(
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3}|'
r'([0-9a-fA-F]{1,4}:){1,7}[0-9a-fA-F]{0,4}(:[0-9a-fA-F]{1,4}){1,7})\s'
r'\w+\s'
r'\w+\s'
r'(\w+\s(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2}))))?\s'
r'\s?(router)?'
r'\s?(nud)?'
r'(?P<status>(\w+))')
_ARP_CMD = 'arp -n'
_ARP_REGEX = re.compile(
r'.+\s' +
r'\((?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\)\s' +
r'.+\s' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))' +
r'\s' +
r'.*')
def get_scanner(hass, config):
"""Validate the configuration and return an ASUS-WRT scanner.""" """Validate the configuration and return an ASUS-WRT scanner."""
scanner = AsusWrtDeviceScanner(config[DOMAIN]) scanner = AsusWrtDeviceScanner(config[DOMAIN])
await scanner.async_connect()
return scanner if scanner.success_init else None return scanner if scanner.success_init else None
def _parse_lines(lines, regex):
"""Parse the lines using the given regular expression.
If a line can't be parsed it is logged and skipped in the output.
"""
results = []
for line in lines:
match = regex.search(line)
if not match:
_LOGGER.debug("Could not parse row: %s", line)
continue
results.append(match.groupdict())
return results
Device = namedtuple('Device', ['mac', 'ip', 'name'])
class AsusWrtDeviceScanner(DeviceScanner): class AsusWrtDeviceScanner(DeviceScanner):
"""This class queries a router running ASUSWRT firmware.""" """This class queries a router running ASUSWRT firmware."""
# Eighth attribute needed for mode (AP mode vs router mode) # Eighth attribute needed for mode (AP mode vs router mode)
def __init__(self, config): def __init__(self, config):
"""Initialize the scanner.""" """Initialize the scanner."""
self.host = config[CONF_HOST] from aioasuswrt.asuswrt import AsusWrt
self.username = config[CONF_USERNAME]
self.password = config.get(CONF_PASSWORD, '')
self.ssh_key = config.get('ssh_key', config.get('pub_key', ''))
self.protocol = config[CONF_PROTOCOL]
self.mode = config[CONF_MODE]
self.port = config[CONF_PORT]
self.require_ip = config[CONF_REQUIRE_IP]
if self.protocol == 'ssh': self.last_results = {}
self.connection = SshConnection( self.success_init = False
self.host, self.port, self.username, self.password, self.connection = AsusWrt(config[CONF_HOST], config[CONF_PORT],
self.ssh_key) config[CONF_PROTOCOL] == 'telnet',
else: config[CONF_USERNAME],
self.connection = TelnetConnection( config.get(CONF_PASSWORD, ''),
self.host, self.port, self.username, self.password) config.get('ssh_key',
config.get('pub_key', '')),
config[CONF_MODE], config[CONF_REQUIRE_IP])
async def async_connect(self):
"""Initialize connection to the router."""
self.last_results = {} self.last_results = {}
# Test the router is accessible. # Test the router is accessible.
data = self.get_asuswrt_data() data = await self.connection.async_get_connected_devices()
self.success_init = data is not None self.success_init = data is not None
def scan_devices(self): async def async_scan_devices(self):
"""Scan for new devices and return a list with found device IDs.""" """Scan for new devices and return a list with found device IDs."""
self._update_info() await self.async_update_info()
return list(self.last_results.keys()) return list(self.last_results.keys())
def get_device_name(self, device): async def async_get_device_name(self, device):
"""Return the name of the given device or None if we don't know.""" """Return the name of the given device or None if we don't know."""
if device not in self.last_results: if device not in self.last_results:
return None return None
return self.last_results[device].name return self.last_results[device].name
def _update_info(self): async def async_update_info(self):
"""Ensure the information from the ASUSWRT router is up to date. """Ensure the information from the ASUSWRT router is up to date.
Return boolean if scanning successful. Return boolean if scanning successful.
""" """
if not self.success_init:
return False
_LOGGER.info('Checking Devices') _LOGGER.info('Checking Devices')
data = self.get_asuswrt_data()
if not data:
return False
self.last_results = data self.last_results = await self.connection.async_get_connected_devices()
return True
def get_asuswrt_data(self):
"""Retrieve data from ASUSWRT.
Calls various commands on the router and returns the superset of all
responses. Some commands will not work on some routers.
"""
devices = {}
devices.update(self._get_wl())
devices.update(self._get_arp())
devices.update(self._get_neigh(devices))
if not self.mode == 'ap':
devices.update(self._get_leases(devices))
ret_devices = {}
for key in devices:
if not self.require_ip or devices[key].ip is not None:
ret_devices[key] = devices[key]
return ret_devices
def _get_wl(self):
lines = self.connection.run_command(_WL_CMD)
if not lines:
return {}
result = _parse_lines(lines, _WL_REGEX)
devices = {}
for device in result:
mac = device['mac'].upper()
devices[mac] = Device(mac, None, None)
return devices
def _get_leases(self, cur_devices):
lines = self.connection.run_command(_LEASES_CMD)
if not lines:
return {}
lines = [line for line in lines if not line.startswith('duid ')]
result = _parse_lines(lines, _LEASES_REGEX)
devices = {}
for device in result:
# For leases where the client doesn't set a hostname, ensure it
# is blank and not '*', which breaks entity_id down the line.
host = device['host']
if host == '*':
host = ''
mac = device['mac'].upper()
if mac in cur_devices:
devices[mac] = Device(mac, device['ip'], host)
return devices
def _get_neigh(self, cur_devices):
lines = self.connection.run_command(_IP_NEIGH_CMD)
if not lines:
return {}
result = _parse_lines(lines, _IP_NEIGH_REGEX)
devices = {}
for device in result:
status = device['status']
if status is None or status.upper() != 'REACHABLE':
continue
if device['mac'] is not None:
mac = device['mac'].upper()
old_device = cur_devices.get(mac)
old_ip = old_device.ip if old_device else None
devices[mac] = Device(mac, device.get('ip', old_ip), None)
return devices
def _get_arp(self):
lines = self.connection.run_command(_ARP_CMD)
if not lines:
return {}
result = _parse_lines(lines, _ARP_REGEX)
devices = {}
for device in result:
if device['mac'] is not None:
mac = device['mac'].upper()
devices[mac] = Device(mac, device['ip'], None)
return devices
class _Connection:
def __init__(self):
self._connected = False
@property
def connected(self):
"""Return connection state."""
return self._connected
def connect(self):
"""Mark current connection state as connected."""
self._connected = True
def disconnect(self):
"""Mark current connection state as disconnected."""
self._connected = False
class SshConnection(_Connection):
"""Maintains an SSH connection to an ASUS-WRT router."""
def __init__(self, host, port, username, password, ssh_key):
"""Initialize the SSH connection properties."""
super().__init__()
self._ssh = None
self._host = host
self._port = port
self._username = username
self._password = password
self._ssh_key = ssh_key
def run_command(self, command):
"""Run commands through an SSH connection.
Connect to the SSH server if not currently connected, otherwise
use the existing connection.
"""
from pexpect import pxssh, exceptions
try:
if not self.connected:
self.connect()
self._ssh.sendline(command)
self._ssh.prompt()
lines = self._ssh.before.split(b'\n')[1:-1]
return [line.decode('utf-8') for line in lines]
except exceptions.EOF as err:
_LOGGER.error("Connection refused. %s", self._ssh.before)
self.disconnect()
return None
except pxssh.ExceptionPxssh as err:
_LOGGER.error("Unexpected SSH error: %s", err)
self.disconnect()
return None
except AssertionError as err:
_LOGGER.error("Connection to router unavailable: %s", err)
self.disconnect()
return None
def connect(self):
"""Connect to the ASUS-WRT SSH server."""
from pexpect import pxssh
self._ssh = pxssh.pxssh()
if self._ssh_key:
self._ssh.login(self._host, self._username, quiet=False,
ssh_key=self._ssh_key, port=self._port)
else:
self._ssh.login(self._host, self._username, quiet=False,
password=self._password, port=self._port)
super().connect()
def disconnect(self):
"""Disconnect the current SSH connection."""
try:
self._ssh.logout()
except Exception: # pylint: disable=broad-except
pass
finally:
self._ssh = None
super().disconnect()
class TelnetConnection(_Connection):
"""Maintains a Telnet connection to an ASUS-WRT router."""
def __init__(self, host, port, username, password):
"""Initialize the Telnet connection properties."""
super().__init__()
self._telnet = None
self._host = host
self._port = port
self._username = username
self._password = password
self._prompt_string = None
def run_command(self, command):
"""Run a command through a Telnet connection.
Connect to the Telnet server if not currently connected, otherwise
use the existing connection.
"""
try:
if not self.connected:
self.connect()
self._telnet.write('{}\n'.format(command).encode('ascii'))
data = (self._telnet.read_until(self._prompt_string).
split(b'\n')[1:-1])
return [line.decode('utf-8') for line in data]
except EOFError:
_LOGGER.error("Unexpected response from router")
self.disconnect()
return None
except ConnectionRefusedError:
_LOGGER.error("Connection refused by router. Telnet enabled?")
self.disconnect()
return None
except socket.gaierror as exc:
_LOGGER.error("Socket exception: %s", exc)
self.disconnect()
return None
except OSError as exc:
_LOGGER.error("OSError: %s", exc)
self.disconnect()
return None
def connect(self):
"""Connect to the ASUS-WRT Telnet server."""
self._telnet = telnetlib.Telnet(self._host)
self._telnet.read_until(b'login: ')
self._telnet.write((self._username + '\n').encode('ascii'))
self._telnet.read_until(b'Password: ')
self._telnet.write((self._password + '\n').encode('ascii'))
self._prompt_string = self._telnet.read_until(b'#').split(b'\n')[-1]
super().connect()
def disconnect(self):
"""Disconnect the current Telnet connection."""
try:
self._telnet.write('exit\n'.encode('ascii'))
except Exception: # pylint: disable=broad-except
pass
super().disconnect()

View File

@ -87,6 +87,9 @@ abodepy==0.14.0
# homeassistant.components.media_player.frontier_silicon # homeassistant.components.media_player.frontier_silicon
afsapi==0.0.4 afsapi==0.0.4
# homeassistant.components.device_tracker.asuswrt
aioasuswrt==1.0.0
# homeassistant.components.device_tracker.automatic # homeassistant.components.device_tracker.automatic
aioautomatic==0.6.5 aioautomatic==0.6.5
@ -690,7 +693,6 @@ panasonic_viera==0.3.1
pdunehd==1.3 pdunehd==1.3
# homeassistant.components.device_tracker.aruba # homeassistant.components.device_tracker.aruba
# homeassistant.components.device_tracker.asuswrt
# homeassistant.components.device_tracker.cisco_ios # homeassistant.components.device_tracker.cisco_ios
# homeassistant.components.device_tracker.unifi_direct # homeassistant.components.device_tracker.unifi_direct
# homeassistant.components.media_player.pandora # homeassistant.components.media_player.pandora

View File

@ -125,7 +125,6 @@ numpy==1.15.2
paho-mqtt==1.4.0 paho-mqtt==1.4.0
# homeassistant.components.device_tracker.aruba # homeassistant.components.device_tracker.aruba
# homeassistant.components.device_tracker.asuswrt
# homeassistant.components.device_tracker.cisco_ios # homeassistant.components.device_tracker.cisco_ios
# homeassistant.components.device_tracker.unifi_direct # homeassistant.components.device_tracker.unifi_direct
# homeassistant.components.media_player.pandora # homeassistant.components.media_player.pandora

View File

@ -3,9 +3,6 @@ import os
from datetime import timedelta from datetime import timedelta
import unittest import unittest
from unittest import mock from unittest import mock
import socket
import voluptuous as vol
from homeassistant.setup import setup_component from homeassistant.setup import setup_component
from homeassistant.components import device_tracker from homeassistant.components import device_tracker
@ -13,9 +10,7 @@ from homeassistant.components.device_tracker import (
CONF_CONSIDER_HOME, CONF_TRACK_NEW, CONF_NEW_DEVICE_DEFAULTS, CONF_CONSIDER_HOME, CONF_TRACK_NEW, CONF_NEW_DEVICE_DEFAULTS,
CONF_AWAY_HIDE) CONF_AWAY_HIDE)
from homeassistant.components.device_tracker.asuswrt import ( from homeassistant.components.device_tracker.asuswrt import (
CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, DOMAIN, _ARP_REGEX, CONF_PROTOCOL, CONF_MODE, DOMAIN, CONF_PORT)
CONF_PORT, PLATFORM_SCHEMA, Device, get_scanner, AsusWrtDeviceScanner,
_parse_lines, SshConnection, TelnetConnection, CONF_REQUIRE_IP)
from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME, from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME,
CONF_HOST) CONF_HOST)
@ -35,85 +30,6 @@ VALID_CONFIG_ROUTER_SSH = {DOMAIN: {
CONF_PORT: '22' CONF_PORT: '22'
}} }}
WL_DATA = [
'assoclist 01:02:03:04:06:08\r',
'assoclist 08:09:10:11:12:14\r',
'assoclist 08:09:10:11:12:15\r'
]
WL_DEVICES = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip=None, name=None),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip=None, name=None),
'08:09:10:11:12:15': Device(
mac='08:09:10:11:12:15', ip=None, name=None)
}
ARP_DATA = [
'? (123.123.123.125) at 01:02:03:04:06:08 [ether] on eth0\r',
'? (123.123.123.126) at 08:09:10:11:12:14 [ether] on br0\r',
'? (123.123.123.127) at <incomplete> on br0\r',
]
ARP_DEVICES = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None)
}
NEIGH_DATA = [
'123.123.123.125 dev eth0 lladdr 01:02:03:04:06:08 REACHABLE\r',
'123.123.123.126 dev br0 lladdr 08:09:10:11:12:14 REACHABLE\r',
'123.123.123.127 dev br0 FAILED\r',
'123.123.123.128 dev br0 lladdr 08:09:15:15:15:15 DELAY\r',
'fe80::feff:a6ff:feff:12ff dev br0 lladdr fc:ff:a6:ff:12:ff STALE\r',
]
NEIGH_DEVICES = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None)
}
LEASES_DATA = [
'51910 01:02:03:04:06:08 123.123.123.125 TV 01:02:03:04:06:08\r',
'79986 01:02:03:04:06:10 123.123.123.127 android 01:02:03:04:06:15\r',
'23523 08:09:10:11:12:14 123.123.123.126 * 08:09:10:11:12:14\r',
]
LEASES_DEVICES = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip='123.123.123.126', name='')
}
WAKE_DEVICES = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip='123.123.123.126', name='')
}
WAKE_DEVICES_AP = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None)
}
WAKE_DEVICES_NO_IP = {
'01:02:03:04:06:08': Device(
mac='01:02:03:04:06:08', ip='123.123.123.125', name=None),
'08:09:10:11:12:14': Device(
mac='08:09:10:11:12:14', ip='123.123.123.126', name=None),
'08:09:10:11:12:15': Device(
mac='08:09:10:11:12:15', ip=None, name=None)
}
def setup_module(): def setup_module():
"""Set up the test module.""" """Set up the test module."""
@ -150,24 +66,6 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
except FileNotFoundError: except FileNotFoundError:
pass pass
def test_parse_lines_wrong_input(self):
"""Testing parse lines."""
output = _parse_lines("asdf asdfdfsafad", _ARP_REGEX)
self.assertEqual(output, [])
def test_get_device_name(self):
"""Test for getting name."""
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.last_results = WAKE_DEVICES
self.assertEqual('TV', scanner.get_device_name('01:02:03:04:06:08'))
self.assertEqual(None, scanner.get_device_name('01:02:03:04:08:08'))
def test_scan_devices(self):
"""Test for scan devices."""
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.last_results = WAKE_DEVICES
self.assertEqual(list(WAKE_DEVICES), scanner.scan_devices())
def test_password_or_pub_key_required(self): def test_password_or_pub_key_required(self):
"""Test creating an AsusWRT scanner without a pass or pubkey.""" """Test creating an AsusWRT scanner without a pass or pubkey."""
with assert_setup_component(0, DOMAIN): with assert_setup_component(0, DOMAIN):
@ -207,377 +105,3 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
conf_dict[DOMAIN][CONF_PORT] = 22 conf_dict[DOMAIN][CONF_PORT] = 22
self.assertEqual(asuswrt_mock.call_count, 1) self.assertEqual(asuswrt_mock.call_count, 1)
self.assertEqual(asuswrt_mock.call_args, mock.call(conf_dict[DOMAIN])) self.assertEqual(asuswrt_mock.call_args, mock.call(conf_dict[DOMAIN]))
@mock.patch(
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
return_value=mock.MagicMock())
def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock):
"""Test creating an AsusWRT scanner with a pubkey and no password."""
conf_dict = {
device_tracker.DOMAIN: {
CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PUB_KEY: FAKEFILE,
CONF_TRACK_NEW: True,
CONF_CONSIDER_HOME: timedelta(seconds=180),
CONF_NEW_DEVICE_DEFAULTS: {
CONF_TRACK_NEW: True,
CONF_AWAY_HIDE: False
}
}
}
with assert_setup_component(1, DOMAIN):
assert setup_component(self.hass, DOMAIN, conf_dict)
conf_dict[DOMAIN][CONF_MODE] = 'router'
conf_dict[DOMAIN][CONF_PROTOCOL] = 'ssh'
conf_dict[DOMAIN][CONF_PORT] = 22
self.assertEqual(asuswrt_mock.call_count, 1)
self.assertEqual(asuswrt_mock.call_args, mock.call(conf_dict[DOMAIN]))
def test_ssh_login_with_pub_key(self):
"""Test that login is done with pub_key when configured to."""
ssh = mock.MagicMock()
ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh)
ssh_mock.start()
self.addCleanup(ssh_mock.stop)
conf_dict = PLATFORM_SCHEMA({
CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PUB_KEY: FAKEFILE
})
update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data')
update_mock.start()
self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.connection.run_command('ls')
self.assertEqual(ssh.login.call_count, 1)
self.assertEqual(
ssh.login.call_args,
mock.call('fake_host', 'fake_user', quiet=False,
ssh_key=FAKEFILE, port=22)
)
def test_ssh_login_with_password(self):
"""Test that login is done with password when configured to."""
ssh = mock.MagicMock()
ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh)
ssh_mock.start()
self.addCleanup(ssh_mock.stop)
conf_dict = PLATFORM_SCHEMA({
CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass'
})
update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data')
update_mock.start()
self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.connection.run_command('ls')
self.assertEqual(ssh.login.call_count, 1)
self.assertEqual(
ssh.login.call_args,
mock.call('fake_host', 'fake_user', quiet=False,
password='fake_pass', port=22)
)
def test_ssh_login_without_password_or_pubkey(self):
"""Test that login is not called without password or pub_key."""
ssh = mock.MagicMock()
ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh)
ssh_mock.start()
self.addCleanup(ssh_mock.stop)
conf_dict = {
CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
}
with self.assertRaises(vol.Invalid):
conf_dict = PLATFORM_SCHEMA(conf_dict)
update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data')
update_mock.start()
self.addCleanup(update_mock.stop)
with assert_setup_component(0, DOMAIN):
assert setup_component(self.hass, DOMAIN,
{DOMAIN: conf_dict})
ssh.login.assert_not_called()
def test_telnet_login_with_password(self):
"""Test that login is done with password when configured to."""
telnet = mock.MagicMock()
telnet_mock = mock.patch('telnetlib.Telnet', return_value=telnet)
telnet_mock.start()
self.addCleanup(telnet_mock.stop)
conf_dict = PLATFORM_SCHEMA({
CONF_PLATFORM: 'asuswrt',
CONF_PROTOCOL: 'telnet',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass'
})
update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data')
update_mock.start()
self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.connection.run_command('ls')
self.assertEqual(telnet.read_until.call_count, 4)
self.assertEqual(telnet.write.call_count, 3)
self.assertEqual(
telnet.read_until.call_args_list[0],
mock.call(b'login: ')
)
self.assertEqual(
telnet.write.call_args_list[0],
mock.call(b'fake_user\n')
)
self.assertEqual(
telnet.read_until.call_args_list[1],
mock.call(b'Password: ')
)
self.assertEqual(
telnet.write.call_args_list[1],
mock.call(b'fake_pass\n')
)
self.assertEqual(
telnet.read_until.call_args_list[2],
mock.call(b'#')
)
def test_telnet_login_without_password(self):
"""Test that login is not called without password or pub_key."""
telnet = mock.MagicMock()
telnet_mock = mock.patch('telnetlib.Telnet', return_value=telnet)
telnet_mock.start()
self.addCleanup(telnet_mock.stop)
conf_dict = {
CONF_PLATFORM: 'asuswrt',
CONF_PROTOCOL: 'telnet',
CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user',
}
with self.assertRaises(vol.Invalid):
conf_dict = PLATFORM_SCHEMA(conf_dict)
update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data')
update_mock.start()
self.addCleanup(update_mock.stop)
with assert_setup_component(0, DOMAIN):
assert setup_component(self.hass, DOMAIN,
{DOMAIN: conf_dict})
telnet.login.assert_not_called()
def test_get_asuswrt_data(self):
"""Test asuswrt data fetch."""
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner._get_wl = mock.Mock()
scanner._get_arp = mock.Mock()
scanner._get_neigh = mock.Mock()
scanner._get_leases = mock.Mock()
scanner._get_wl.return_value = WL_DEVICES
scanner._get_arp.return_value = ARP_DEVICES
scanner._get_neigh.return_value = NEIGH_DEVICES
scanner._get_leases.return_value = LEASES_DEVICES
self.assertEqual(WAKE_DEVICES, scanner.get_asuswrt_data())
def test_get_asuswrt_data_ap(self):
"""Test for get asuswrt_data in ap mode."""
conf = VALID_CONFIG_ROUTER_SSH.copy()[DOMAIN]
conf[CONF_MODE] = 'ap'
scanner = AsusWrtDeviceScanner(conf)
scanner._get_wl = mock.Mock()
scanner._get_arp = mock.Mock()
scanner._get_neigh = mock.Mock()
scanner._get_leases = mock.Mock()
scanner._get_wl.return_value = WL_DEVICES
scanner._get_arp.return_value = ARP_DEVICES
scanner._get_neigh.return_value = NEIGH_DEVICES
scanner._get_leases.return_value = LEASES_DEVICES
self.assertEqual(WAKE_DEVICES_AP, scanner.get_asuswrt_data())
def test_get_asuswrt_data_no_ip(self):
"""Test for get asuswrt_data and not requiring ip."""
conf = VALID_CONFIG_ROUTER_SSH.copy()[DOMAIN]
conf[CONF_REQUIRE_IP] = False
scanner = AsusWrtDeviceScanner(conf)
scanner._get_wl = mock.Mock()
scanner._get_arp = mock.Mock()
scanner._get_neigh = mock.Mock()
scanner._get_leases = mock.Mock()
scanner._get_wl.return_value = WL_DEVICES
scanner._get_arp.return_value = ARP_DEVICES
scanner._get_neigh.return_value = NEIGH_DEVICES
scanner._get_leases.return_value = LEASES_DEVICES
self.assertEqual(WAKE_DEVICES_NO_IP, scanner.get_asuswrt_data())
def test_update_info(self):
"""Test for update info."""
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.get_asuswrt_data = mock.Mock()
scanner.get_asuswrt_data.return_value = WAKE_DEVICES
self.assertTrue(scanner._update_info())
self.assertTrue(scanner.last_results, WAKE_DEVICES)
scanner.success_init = False
self.assertFalse(scanner._update_info())
@mock.patch(
'homeassistant.components.device_tracker.asuswrt.SshConnection')
def test_get_wl(self, mocked_ssh):
"""Testing wl."""
mocked_ssh.run_command.return_value = WL_DATA
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.connection = mocked_ssh
self.assertEqual(WL_DEVICES, scanner._get_wl())
mocked_ssh.run_command.return_value = ''
self.assertEqual({}, scanner._get_wl())
@mock.patch(
'homeassistant.components.device_tracker.asuswrt.SshConnection')
def test_get_arp(self, mocked_ssh):
"""Testing arp."""
mocked_ssh.run_command.return_value = ARP_DATA
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.connection = mocked_ssh
self.assertEqual(ARP_DEVICES, scanner._get_arp())
mocked_ssh.run_command.return_value = ''
self.assertEqual({}, scanner._get_arp())
@mock.patch(
'homeassistant.components.device_tracker.asuswrt.SshConnection')
def test_get_neigh(self, mocked_ssh):
"""Testing neigh."""
mocked_ssh.run_command.return_value = NEIGH_DATA
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.connection = mocked_ssh
self.assertEqual(NEIGH_DEVICES, scanner._get_neigh(ARP_DEVICES.copy()))
self.assertEqual(NEIGH_DEVICES, scanner._get_neigh({
'UN:KN:WN:DE:VI:CE': Device('UN:KN:WN:DE:VI:CE', None, None),
}))
mocked_ssh.run_command.return_value = ''
self.assertEqual({}, scanner._get_neigh(ARP_DEVICES.copy()))
@mock.patch(
'homeassistant.components.device_tracker.asuswrt.SshConnection')
def test_get_leases(self, mocked_ssh):
"""Testing leases."""
mocked_ssh.run_command.return_value = LEASES_DATA
scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH)
scanner.connection = mocked_ssh
self.assertEqual(
LEASES_DEVICES, scanner._get_leases(NEIGH_DEVICES.copy()))
mocked_ssh.run_command.return_value = ''
self.assertEqual({}, scanner._get_leases(NEIGH_DEVICES.copy()))
@pytest.mark.skip(
reason="These tests are performing actual failing network calls. They "
"need to be cleaned up before they are re-enabled. They're frequently "
"failing in Travis.")
class TestSshConnection(unittest.TestCase):
"""Testing SshConnection."""
def setUp(self):
"""Set up test env."""
self.connection = SshConnection(
'fake', 'fake', 'fake', 'fake', 'fake')
self.connection._connected = True
def test_run_command_exception_eof(self):
"""Testing exception in run_command."""
from pexpect import exceptions
self.connection._ssh = mock.Mock()
self.connection._ssh.sendline = mock.Mock()
self.connection._ssh.sendline.side_effect = exceptions.EOF('except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)
self.assertIsNone(self.connection._ssh)
def test_run_command_exception_pxssh(self):
"""Testing exception in run_command."""
from pexpect import pxssh
self.connection._ssh = mock.Mock()
self.connection._ssh.sendline = mock.Mock()
self.connection._ssh.sendline.side_effect = pxssh.ExceptionPxssh(
'except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)
self.assertIsNone(self.connection._ssh)
def test_run_command_assertion_error(self):
"""Testing exception in run_command."""
self.connection._ssh = mock.Mock()
self.connection._ssh.sendline = mock.Mock()
self.connection._ssh.sendline.side_effect = AssertionError('except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)
self.assertIsNone(self.connection._ssh)
@pytest.mark.skip(
reason="These tests are performing actual failing network calls. They "
"need to be cleaned up before they are re-enabled. They're frequently "
"failing in Travis.")
class TestTelnetConnection(unittest.TestCase):
"""Testing TelnetConnection."""
def setUp(self):
"""Set up test env."""
self.connection = TelnetConnection(
'fake', 'fake', 'fake', 'fake')
self.connection._connected = True
def test_run_command_exception_eof(self):
"""Testing EOFException in run_command."""
self.connection._telnet = mock.Mock()
self.connection._telnet.write = mock.Mock()
self.connection._telnet.write.side_effect = EOFError('except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)
def test_run_command_exception_connection_refused(self):
"""Testing ConnectionRefusedError in run_command."""
self.connection._telnet = mock.Mock()
self.connection._telnet.write = mock.Mock()
self.connection._telnet.write.side_effect = ConnectionRefusedError(
'except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)
def test_run_command_exception_gaierror(self):
"""Testing socket.gaierror in run_command."""
self.connection._telnet = mock.Mock()
self.connection._telnet.write = mock.Mock()
self.connection._telnet.write.side_effect = socket.gaierror('except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)
def test_run_command_exception_oserror(self):
"""Testing OSError in run_command."""
self.connection._telnet = mock.Mock()
self.connection._telnet.write = mock.Mock()
self.connection._telnet.write.side_effect = OSError('except')
self.connection.run_command('test')
self.assertFalse(self.connection._connected)