From 277a9a39955d636b7c828a09d0ad1b94ce3a54b7 Mon Sep 17 00:00:00 2001 From: kennedyshead Date: Tue, 23 Oct 2018 11:08:11 +0200 Subject: [PATCH] Async version for asuswrt (#17692) * Testing async data for asuswrt * Moved to lib --- CODEOWNERS | 1 + .../components/device_tracker/asuswrt.py | 336 +----------- requirements_all.txt | 4 +- requirements_test_all.txt | 1 - .../components/device_tracker/test_asuswrt.py | 478 +----------------- 5 files changed, 26 insertions(+), 794 deletions(-) diff --git a/CODEOWNERS b/CODEOWNERS index c49af4864a9..2bf31378ac3 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -58,6 +58,7 @@ homeassistant/components/climate/mill.py @danielhiversen homeassistant/components/climate/sensibo.py @andrey-git homeassistant/components/cover/group.py @cdce8p homeassistant/components/cover/template.py @PhracturedBlue +homeassistant/components/device_tracker/asuswrt.py @kennedyshead homeassistant/components/device_tracker/automatic.py @armills homeassistant/components/device_tracker/huawei_router.py @abmantis homeassistant/components/device_tracker/quantum_gateway.py @cisasteelersfan diff --git a/homeassistant/components/device_tracker/asuswrt.py b/homeassistant/components/device_tracker/asuswrt.py index 710a07f77d3..461380b2c8e 100644 --- a/homeassistant/components/device_tracker/asuswrt.py +++ b/homeassistant/components/device_tracker/asuswrt.py @@ -5,10 +5,6 @@ For more details about this platform, please refer to the documentation at https://home-assistant.io/components/device_tracker.asuswrt/ """ import logging -import re -import socket -import telnetlib -from collections import namedtuple import voluptuous as vol @@ -19,7 +15,7 @@ from homeassistant.const import ( CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT, CONF_MODE, CONF_PROTOCOL) -REQUIREMENTS = ['pexpect==4.6.0'] +REQUIREMENTS = ['aioasuswrt==1.0.0'] _LOGGER = logging.getLogger(__name__) @@ -44,345 +40,55 @@ PLATFORM_SCHEMA = vol.All( })) -_LEASES_CMD = 'cat /var/lib/misc/dnsmasq.leases' -_LEASES_REGEX = re.compile( - r'\w+\s' + - r'(?P(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s' + - r'(?P([0-9]{1,3}[\.]){3}[0-9]{1,3})\s' + - r'(?P([^\s]+))') - -# Command to get both 5GHz and 2.4GHz clients -_WL_CMD = 'for dev in `nvram get wl_ifnames`; do wl -i $dev assoclist; done' -_WL_REGEX = re.compile( - r'\w+\s' + - r'(?P(([0-9A-F]{2}[:-]){5}([0-9A-F]{2})))') - -_IP_NEIGH_CMD = 'ip neigh' -_IP_NEIGH_REGEX = re.compile( - r'(?P([0-9]{1,3}[\.]){3}[0-9]{1,3}|' - r'([0-9a-fA-F]{1,4}:){1,7}[0-9a-fA-F]{0,4}(:[0-9a-fA-F]{1,4}){1,7})\s' - r'\w+\s' - r'\w+\s' - r'(\w+\s(?P(([0-9a-f]{2}[:-]){5}([0-9a-f]{2}))))?\s' - r'\s?(router)?' - r'\s?(nud)?' - r'(?P(\w+))') - -_ARP_CMD = 'arp -n' -_ARP_REGEX = re.compile( - r'.+\s' + - r'\((?P([0-9]{1,3}[\.]){3}[0-9]{1,3})\)\s' + - r'.+\s' + - r'(?P(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))' + - r'\s' + - r'.*') - - -def get_scanner(hass, config): +async def async_get_scanner(hass, config): """Validate the configuration and return an ASUS-WRT scanner.""" scanner = AsusWrtDeviceScanner(config[DOMAIN]) - + await scanner.async_connect() return scanner if scanner.success_init else None -def _parse_lines(lines, regex): - """Parse the lines using the given regular expression. - - If a line can't be parsed it is logged and skipped in the output. - """ - results = [] - for line in lines: - match = regex.search(line) - if not match: - _LOGGER.debug("Could not parse row: %s", line) - continue - results.append(match.groupdict()) - return results - - -Device = namedtuple('Device', ['mac', 'ip', 'name']) - - class AsusWrtDeviceScanner(DeviceScanner): """This class queries a router running ASUSWRT firmware.""" # Eighth attribute needed for mode (AP mode vs router mode) def __init__(self, config): """Initialize the scanner.""" - self.host = config[CONF_HOST] - self.username = config[CONF_USERNAME] - self.password = config.get(CONF_PASSWORD, '') - self.ssh_key = config.get('ssh_key', config.get('pub_key', '')) - self.protocol = config[CONF_PROTOCOL] - self.mode = config[CONF_MODE] - self.port = config[CONF_PORT] - self.require_ip = config[CONF_REQUIRE_IP] + from aioasuswrt.asuswrt import AsusWrt - if self.protocol == 'ssh': - self.connection = SshConnection( - self.host, self.port, self.username, self.password, - self.ssh_key) - else: - self.connection = TelnetConnection( - self.host, self.port, self.username, self.password) + self.last_results = {} + self.success_init = False + self.connection = AsusWrt(config[CONF_HOST], config[CONF_PORT], + config[CONF_PROTOCOL] == 'telnet', + config[CONF_USERNAME], + config.get(CONF_PASSWORD, ''), + config.get('ssh_key', + config.get('pub_key', '')), + config[CONF_MODE], config[CONF_REQUIRE_IP]) + async def async_connect(self): + """Initialize connection to the router.""" self.last_results = {} # Test the router is accessible. - data = self.get_asuswrt_data() + data = await self.connection.async_get_connected_devices() self.success_init = data is not None - def scan_devices(self): + async def async_scan_devices(self): """Scan for new devices and return a list with found device IDs.""" - self._update_info() + await self.async_update_info() return list(self.last_results.keys()) - def get_device_name(self, device): + async def async_get_device_name(self, device): """Return the name of the given device or None if we don't know.""" if device not in self.last_results: return None return self.last_results[device].name - def _update_info(self): + async def async_update_info(self): """Ensure the information from the ASUSWRT router is up to date. Return boolean if scanning successful. """ - if not self.success_init: - return False - _LOGGER.info('Checking Devices') - data = self.get_asuswrt_data() - if not data: - return False - self.last_results = data - return True - - def get_asuswrt_data(self): - """Retrieve data from ASUSWRT. - - Calls various commands on the router and returns the superset of all - responses. Some commands will not work on some routers. - """ - devices = {} - devices.update(self._get_wl()) - devices.update(self._get_arp()) - devices.update(self._get_neigh(devices)) - if not self.mode == 'ap': - devices.update(self._get_leases(devices)) - - ret_devices = {} - for key in devices: - if not self.require_ip or devices[key].ip is not None: - ret_devices[key] = devices[key] - return ret_devices - - def _get_wl(self): - lines = self.connection.run_command(_WL_CMD) - if not lines: - return {} - result = _parse_lines(lines, _WL_REGEX) - devices = {} - for device in result: - mac = device['mac'].upper() - devices[mac] = Device(mac, None, None) - return devices - - def _get_leases(self, cur_devices): - lines = self.connection.run_command(_LEASES_CMD) - if not lines: - return {} - lines = [line for line in lines if not line.startswith('duid ')] - result = _parse_lines(lines, _LEASES_REGEX) - devices = {} - for device in result: - # For leases where the client doesn't set a hostname, ensure it - # is blank and not '*', which breaks entity_id down the line. - host = device['host'] - if host == '*': - host = '' - mac = device['mac'].upper() - if mac in cur_devices: - devices[mac] = Device(mac, device['ip'], host) - return devices - - def _get_neigh(self, cur_devices): - lines = self.connection.run_command(_IP_NEIGH_CMD) - if not lines: - return {} - result = _parse_lines(lines, _IP_NEIGH_REGEX) - devices = {} - for device in result: - status = device['status'] - if status is None or status.upper() != 'REACHABLE': - continue - if device['mac'] is not None: - mac = device['mac'].upper() - old_device = cur_devices.get(mac) - old_ip = old_device.ip if old_device else None - devices[mac] = Device(mac, device.get('ip', old_ip), None) - return devices - - def _get_arp(self): - lines = self.connection.run_command(_ARP_CMD) - if not lines: - return {} - result = _parse_lines(lines, _ARP_REGEX) - devices = {} - for device in result: - if device['mac'] is not None: - mac = device['mac'].upper() - devices[mac] = Device(mac, device['ip'], None) - return devices - - -class _Connection: - def __init__(self): - self._connected = False - - @property - def connected(self): - """Return connection state.""" - return self._connected - - def connect(self): - """Mark current connection state as connected.""" - self._connected = True - - def disconnect(self): - """Mark current connection state as disconnected.""" - self._connected = False - - -class SshConnection(_Connection): - """Maintains an SSH connection to an ASUS-WRT router.""" - - def __init__(self, host, port, username, password, ssh_key): - """Initialize the SSH connection properties.""" - super().__init__() - - self._ssh = None - self._host = host - self._port = port - self._username = username - self._password = password - self._ssh_key = ssh_key - - def run_command(self, command): - """Run commands through an SSH connection. - - Connect to the SSH server if not currently connected, otherwise - use the existing connection. - """ - from pexpect import pxssh, exceptions - - try: - if not self.connected: - self.connect() - self._ssh.sendline(command) - self._ssh.prompt() - lines = self._ssh.before.split(b'\n')[1:-1] - return [line.decode('utf-8') for line in lines] - except exceptions.EOF as err: - _LOGGER.error("Connection refused. %s", self._ssh.before) - self.disconnect() - return None - except pxssh.ExceptionPxssh as err: - _LOGGER.error("Unexpected SSH error: %s", err) - self.disconnect() - return None - except AssertionError as err: - _LOGGER.error("Connection to router unavailable: %s", err) - self.disconnect() - return None - - def connect(self): - """Connect to the ASUS-WRT SSH server.""" - from pexpect import pxssh - - self._ssh = pxssh.pxssh() - if self._ssh_key: - self._ssh.login(self._host, self._username, quiet=False, - ssh_key=self._ssh_key, port=self._port) - else: - self._ssh.login(self._host, self._username, quiet=False, - password=self._password, port=self._port) - - super().connect() - - def disconnect(self): - """Disconnect the current SSH connection.""" - try: - self._ssh.logout() - except Exception: # pylint: disable=broad-except - pass - finally: - self._ssh = None - - super().disconnect() - - -class TelnetConnection(_Connection): - """Maintains a Telnet connection to an ASUS-WRT router.""" - - def __init__(self, host, port, username, password): - """Initialize the Telnet connection properties.""" - super().__init__() - - self._telnet = None - self._host = host - self._port = port - self._username = username - self._password = password - self._prompt_string = None - - def run_command(self, command): - """Run a command through a Telnet connection. - - Connect to the Telnet server if not currently connected, otherwise - use the existing connection. - """ - try: - if not self.connected: - self.connect() - self._telnet.write('{}\n'.format(command).encode('ascii')) - data = (self._telnet.read_until(self._prompt_string). - split(b'\n')[1:-1]) - return [line.decode('utf-8') for line in data] - except EOFError: - _LOGGER.error("Unexpected response from router") - self.disconnect() - return None - except ConnectionRefusedError: - _LOGGER.error("Connection refused by router. Telnet enabled?") - self.disconnect() - return None - except socket.gaierror as exc: - _LOGGER.error("Socket exception: %s", exc) - self.disconnect() - return None - except OSError as exc: - _LOGGER.error("OSError: %s", exc) - self.disconnect() - return None - - def connect(self): - """Connect to the ASUS-WRT Telnet server.""" - self._telnet = telnetlib.Telnet(self._host) - self._telnet.read_until(b'login: ') - self._telnet.write((self._username + '\n').encode('ascii')) - self._telnet.read_until(b'Password: ') - self._telnet.write((self._password + '\n').encode('ascii')) - self._prompt_string = self._telnet.read_until(b'#').split(b'\n')[-1] - - super().connect() - - def disconnect(self): - """Disconnect the current Telnet connection.""" - try: - self._telnet.write('exit\n'.encode('ascii')) - except Exception: # pylint: disable=broad-except - pass - - super().disconnect() + self.last_results = await self.connection.async_get_connected_devices() diff --git a/requirements_all.txt b/requirements_all.txt index 5d7de61eadf..ffd2be0b3bc 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -87,6 +87,9 @@ abodepy==0.14.0 # homeassistant.components.media_player.frontier_silicon afsapi==0.0.4 +# homeassistant.components.device_tracker.asuswrt +aioasuswrt==1.0.0 + # homeassistant.components.device_tracker.automatic aioautomatic==0.6.5 @@ -690,7 +693,6 @@ panasonic_viera==0.3.1 pdunehd==1.3 # homeassistant.components.device_tracker.aruba -# homeassistant.components.device_tracker.asuswrt # homeassistant.components.device_tracker.cisco_ios # homeassistant.components.device_tracker.unifi_direct # homeassistant.components.media_player.pandora diff --git a/requirements_test_all.txt b/requirements_test_all.txt index a6b7f14f06e..fe02cb0fab1 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -125,7 +125,6 @@ numpy==1.15.2 paho-mqtt==1.4.0 # homeassistant.components.device_tracker.aruba -# homeassistant.components.device_tracker.asuswrt # homeassistant.components.device_tracker.cisco_ios # homeassistant.components.device_tracker.unifi_direct # homeassistant.components.media_player.pandora diff --git a/tests/components/device_tracker/test_asuswrt.py b/tests/components/device_tracker/test_asuswrt.py index 8c5af618288..d43a7d53969 100644 --- a/tests/components/device_tracker/test_asuswrt.py +++ b/tests/components/device_tracker/test_asuswrt.py @@ -3,9 +3,6 @@ import os from datetime import timedelta import unittest from unittest import mock -import socket - -import voluptuous as vol from homeassistant.setup import setup_component from homeassistant.components import device_tracker @@ -13,9 +10,7 @@ from homeassistant.components.device_tracker import ( CONF_CONSIDER_HOME, CONF_TRACK_NEW, CONF_NEW_DEVICE_DEFAULTS, CONF_AWAY_HIDE) from homeassistant.components.device_tracker.asuswrt import ( - CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, DOMAIN, _ARP_REGEX, - CONF_PORT, PLATFORM_SCHEMA, Device, get_scanner, AsusWrtDeviceScanner, - _parse_lines, SshConnection, TelnetConnection, CONF_REQUIRE_IP) + CONF_PROTOCOL, CONF_MODE, DOMAIN, CONF_PORT) from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME, CONF_HOST) @@ -35,85 +30,6 @@ VALID_CONFIG_ROUTER_SSH = {DOMAIN: { CONF_PORT: '22' }} -WL_DATA = [ - 'assoclist 01:02:03:04:06:08\r', - 'assoclist 08:09:10:11:12:14\r', - 'assoclist 08:09:10:11:12:15\r' -] - -WL_DEVICES = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip=None, name=None), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip=None, name=None), - '08:09:10:11:12:15': Device( - mac='08:09:10:11:12:15', ip=None, name=None) -} - -ARP_DATA = [ - '? (123.123.123.125) at 01:02:03:04:06:08 [ether] on eth0\r', - '? (123.123.123.126) at 08:09:10:11:12:14 [ether] on br0\r', - '? (123.123.123.127) at on br0\r', -] - -ARP_DEVICES = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip='123.123.123.126', name=None) -} - -NEIGH_DATA = [ - '123.123.123.125 dev eth0 lladdr 01:02:03:04:06:08 REACHABLE\r', - '123.123.123.126 dev br0 lladdr 08:09:10:11:12:14 REACHABLE\r', - '123.123.123.127 dev br0 FAILED\r', - '123.123.123.128 dev br0 lladdr 08:09:15:15:15:15 DELAY\r', - 'fe80::feff:a6ff:feff:12ff dev br0 lladdr fc:ff:a6:ff:12:ff STALE\r', -] - -NEIGH_DEVICES = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip='123.123.123.126', name=None) -} - -LEASES_DATA = [ - '51910 01:02:03:04:06:08 123.123.123.125 TV 01:02:03:04:06:08\r', - '79986 01:02:03:04:06:10 123.123.123.127 android 01:02:03:04:06:15\r', - '23523 08:09:10:11:12:14 123.123.123.126 * 08:09:10:11:12:14\r', -] - -LEASES_DEVICES = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip='123.123.123.126', name='') -} - -WAKE_DEVICES = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip='123.123.123.125', name='TV'), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip='123.123.123.126', name='') -} - -WAKE_DEVICES_AP = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip='123.123.123.126', name=None) -} - -WAKE_DEVICES_NO_IP = { - '01:02:03:04:06:08': Device( - mac='01:02:03:04:06:08', ip='123.123.123.125', name=None), - '08:09:10:11:12:14': Device( - mac='08:09:10:11:12:14', ip='123.123.123.126', name=None), - '08:09:10:11:12:15': Device( - mac='08:09:10:11:12:15', ip=None, name=None) -} - def setup_module(): """Set up the test module.""" @@ -150,24 +66,6 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase): except FileNotFoundError: pass - def test_parse_lines_wrong_input(self): - """Testing parse lines.""" - output = _parse_lines("asdf asdfdfsafad", _ARP_REGEX) - self.assertEqual(output, []) - - def test_get_device_name(self): - """Test for getting name.""" - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.last_results = WAKE_DEVICES - self.assertEqual('TV', scanner.get_device_name('01:02:03:04:06:08')) - self.assertEqual(None, scanner.get_device_name('01:02:03:04:08:08')) - - def test_scan_devices(self): - """Test for scan devices.""" - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.last_results = WAKE_DEVICES - self.assertEqual(list(WAKE_DEVICES), scanner.scan_devices()) - def test_password_or_pub_key_required(self): """Test creating an AsusWRT scanner without a pass or pubkey.""" with assert_setup_component(0, DOMAIN): @@ -207,377 +105,3 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase): conf_dict[DOMAIN][CONF_PORT] = 22 self.assertEqual(asuswrt_mock.call_count, 1) self.assertEqual(asuswrt_mock.call_args, mock.call(conf_dict[DOMAIN])) - - @mock.patch( - 'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner', - return_value=mock.MagicMock()) - def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): - """Test creating an AsusWRT scanner with a pubkey and no password.""" - conf_dict = { - device_tracker.DOMAIN: { - CONF_PLATFORM: 'asuswrt', - CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user', - CONF_PUB_KEY: FAKEFILE, - CONF_TRACK_NEW: True, - CONF_CONSIDER_HOME: timedelta(seconds=180), - CONF_NEW_DEVICE_DEFAULTS: { - CONF_TRACK_NEW: True, - CONF_AWAY_HIDE: False - } - } - } - - with assert_setup_component(1, DOMAIN): - assert setup_component(self.hass, DOMAIN, conf_dict) - - conf_dict[DOMAIN][CONF_MODE] = 'router' - conf_dict[DOMAIN][CONF_PROTOCOL] = 'ssh' - conf_dict[DOMAIN][CONF_PORT] = 22 - self.assertEqual(asuswrt_mock.call_count, 1) - self.assertEqual(asuswrt_mock.call_args, mock.call(conf_dict[DOMAIN])) - - def test_ssh_login_with_pub_key(self): - """Test that login is done with pub_key when configured to.""" - ssh = mock.MagicMock() - ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh) - ssh_mock.start() - self.addCleanup(ssh_mock.stop) - conf_dict = PLATFORM_SCHEMA({ - CONF_PLATFORM: 'asuswrt', - CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user', - CONF_PUB_KEY: FAKEFILE - }) - update_mock = mock.patch( - 'homeassistant.components.device_tracker.asuswrt.' - 'AsusWrtDeviceScanner.get_asuswrt_data') - update_mock.start() - self.addCleanup(update_mock.stop) - asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) - asuswrt.connection.run_command('ls') - self.assertEqual(ssh.login.call_count, 1) - self.assertEqual( - ssh.login.call_args, - mock.call('fake_host', 'fake_user', quiet=False, - ssh_key=FAKEFILE, port=22) - ) - - def test_ssh_login_with_password(self): - """Test that login is done with password when configured to.""" - ssh = mock.MagicMock() - ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh) - ssh_mock.start() - self.addCleanup(ssh_mock.stop) - conf_dict = PLATFORM_SCHEMA({ - CONF_PLATFORM: 'asuswrt', - CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user', - CONF_PASSWORD: 'fake_pass' - }) - update_mock = mock.patch( - 'homeassistant.components.device_tracker.asuswrt.' - 'AsusWrtDeviceScanner.get_asuswrt_data') - update_mock.start() - self.addCleanup(update_mock.stop) - asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) - asuswrt.connection.run_command('ls') - self.assertEqual(ssh.login.call_count, 1) - self.assertEqual( - ssh.login.call_args, - mock.call('fake_host', 'fake_user', quiet=False, - password='fake_pass', port=22) - ) - - def test_ssh_login_without_password_or_pubkey(self): - """Test that login is not called without password or pub_key.""" - ssh = mock.MagicMock() - ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh) - ssh_mock.start() - self.addCleanup(ssh_mock.stop) - - conf_dict = { - CONF_PLATFORM: 'asuswrt', - CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user', - } - - with self.assertRaises(vol.Invalid): - conf_dict = PLATFORM_SCHEMA(conf_dict) - - update_mock = mock.patch( - 'homeassistant.components.device_tracker.asuswrt.' - 'AsusWrtDeviceScanner.get_asuswrt_data') - update_mock.start() - self.addCleanup(update_mock.stop) - - with assert_setup_component(0, DOMAIN): - assert setup_component(self.hass, DOMAIN, - {DOMAIN: conf_dict}) - ssh.login.assert_not_called() - - def test_telnet_login_with_password(self): - """Test that login is done with password when configured to.""" - telnet = mock.MagicMock() - telnet_mock = mock.patch('telnetlib.Telnet', return_value=telnet) - telnet_mock.start() - self.addCleanup(telnet_mock.stop) - conf_dict = PLATFORM_SCHEMA({ - CONF_PLATFORM: 'asuswrt', - CONF_PROTOCOL: 'telnet', - CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user', - CONF_PASSWORD: 'fake_pass' - }) - update_mock = mock.patch( - 'homeassistant.components.device_tracker.asuswrt.' - 'AsusWrtDeviceScanner.get_asuswrt_data') - update_mock.start() - self.addCleanup(update_mock.stop) - asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) - asuswrt.connection.run_command('ls') - self.assertEqual(telnet.read_until.call_count, 4) - self.assertEqual(telnet.write.call_count, 3) - self.assertEqual( - telnet.read_until.call_args_list[0], - mock.call(b'login: ') - ) - self.assertEqual( - telnet.write.call_args_list[0], - mock.call(b'fake_user\n') - ) - self.assertEqual( - telnet.read_until.call_args_list[1], - mock.call(b'Password: ') - ) - self.assertEqual( - telnet.write.call_args_list[1], - mock.call(b'fake_pass\n') - ) - self.assertEqual( - telnet.read_until.call_args_list[2], - mock.call(b'#') - ) - - def test_telnet_login_without_password(self): - """Test that login is not called without password or pub_key.""" - telnet = mock.MagicMock() - telnet_mock = mock.patch('telnetlib.Telnet', return_value=telnet) - telnet_mock.start() - self.addCleanup(telnet_mock.stop) - - conf_dict = { - CONF_PLATFORM: 'asuswrt', - CONF_PROTOCOL: 'telnet', - CONF_HOST: 'fake_host', - CONF_USERNAME: 'fake_user', - } - - with self.assertRaises(vol.Invalid): - conf_dict = PLATFORM_SCHEMA(conf_dict) - - update_mock = mock.patch( - 'homeassistant.components.device_tracker.asuswrt.' - 'AsusWrtDeviceScanner.get_asuswrt_data') - update_mock.start() - self.addCleanup(update_mock.stop) - - with assert_setup_component(0, DOMAIN): - assert setup_component(self.hass, DOMAIN, - {DOMAIN: conf_dict}) - telnet.login.assert_not_called() - - def test_get_asuswrt_data(self): - """Test asuswrt data fetch.""" - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner._get_wl = mock.Mock() - scanner._get_arp = mock.Mock() - scanner._get_neigh = mock.Mock() - scanner._get_leases = mock.Mock() - scanner._get_wl.return_value = WL_DEVICES - scanner._get_arp.return_value = ARP_DEVICES - scanner._get_neigh.return_value = NEIGH_DEVICES - scanner._get_leases.return_value = LEASES_DEVICES - self.assertEqual(WAKE_DEVICES, scanner.get_asuswrt_data()) - - def test_get_asuswrt_data_ap(self): - """Test for get asuswrt_data in ap mode.""" - conf = VALID_CONFIG_ROUTER_SSH.copy()[DOMAIN] - conf[CONF_MODE] = 'ap' - scanner = AsusWrtDeviceScanner(conf) - scanner._get_wl = mock.Mock() - scanner._get_arp = mock.Mock() - scanner._get_neigh = mock.Mock() - scanner._get_leases = mock.Mock() - scanner._get_wl.return_value = WL_DEVICES - scanner._get_arp.return_value = ARP_DEVICES - scanner._get_neigh.return_value = NEIGH_DEVICES - scanner._get_leases.return_value = LEASES_DEVICES - self.assertEqual(WAKE_DEVICES_AP, scanner.get_asuswrt_data()) - - def test_get_asuswrt_data_no_ip(self): - """Test for get asuswrt_data and not requiring ip.""" - conf = VALID_CONFIG_ROUTER_SSH.copy()[DOMAIN] - conf[CONF_REQUIRE_IP] = False - scanner = AsusWrtDeviceScanner(conf) - scanner._get_wl = mock.Mock() - scanner._get_arp = mock.Mock() - scanner._get_neigh = mock.Mock() - scanner._get_leases = mock.Mock() - scanner._get_wl.return_value = WL_DEVICES - scanner._get_arp.return_value = ARP_DEVICES - scanner._get_neigh.return_value = NEIGH_DEVICES - scanner._get_leases.return_value = LEASES_DEVICES - self.assertEqual(WAKE_DEVICES_NO_IP, scanner.get_asuswrt_data()) - - def test_update_info(self): - """Test for update info.""" - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.get_asuswrt_data = mock.Mock() - scanner.get_asuswrt_data.return_value = WAKE_DEVICES - self.assertTrue(scanner._update_info()) - self.assertTrue(scanner.last_results, WAKE_DEVICES) - scanner.success_init = False - self.assertFalse(scanner._update_info()) - - @mock.patch( - 'homeassistant.components.device_tracker.asuswrt.SshConnection') - def test_get_wl(self, mocked_ssh): - """Testing wl.""" - mocked_ssh.run_command.return_value = WL_DATA - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.connection = mocked_ssh - self.assertEqual(WL_DEVICES, scanner._get_wl()) - mocked_ssh.run_command.return_value = '' - self.assertEqual({}, scanner._get_wl()) - - @mock.patch( - 'homeassistant.components.device_tracker.asuswrt.SshConnection') - def test_get_arp(self, mocked_ssh): - """Testing arp.""" - mocked_ssh.run_command.return_value = ARP_DATA - - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.connection = mocked_ssh - self.assertEqual(ARP_DEVICES, scanner._get_arp()) - mocked_ssh.run_command.return_value = '' - self.assertEqual({}, scanner._get_arp()) - - @mock.patch( - 'homeassistant.components.device_tracker.asuswrt.SshConnection') - def test_get_neigh(self, mocked_ssh): - """Testing neigh.""" - mocked_ssh.run_command.return_value = NEIGH_DATA - - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.connection = mocked_ssh - self.assertEqual(NEIGH_DEVICES, scanner._get_neigh(ARP_DEVICES.copy())) - self.assertEqual(NEIGH_DEVICES, scanner._get_neigh({ - 'UN:KN:WN:DE:VI:CE': Device('UN:KN:WN:DE:VI:CE', None, None), - })) - mocked_ssh.run_command.return_value = '' - self.assertEqual({}, scanner._get_neigh(ARP_DEVICES.copy())) - - @mock.patch( - 'homeassistant.components.device_tracker.asuswrt.SshConnection') - def test_get_leases(self, mocked_ssh): - """Testing leases.""" - mocked_ssh.run_command.return_value = LEASES_DATA - - scanner = get_scanner(self.hass, VALID_CONFIG_ROUTER_SSH) - scanner.connection = mocked_ssh - self.assertEqual( - LEASES_DEVICES, scanner._get_leases(NEIGH_DEVICES.copy())) - mocked_ssh.run_command.return_value = '' - self.assertEqual({}, scanner._get_leases(NEIGH_DEVICES.copy())) - - -@pytest.mark.skip( - reason="These tests are performing actual failing network calls. They " - "need to be cleaned up before they are re-enabled. They're frequently " - "failing in Travis.") -class TestSshConnection(unittest.TestCase): - """Testing SshConnection.""" - - def setUp(self): - """Set up test env.""" - self.connection = SshConnection( - 'fake', 'fake', 'fake', 'fake', 'fake') - self.connection._connected = True - - def test_run_command_exception_eof(self): - """Testing exception in run_command.""" - from pexpect import exceptions - self.connection._ssh = mock.Mock() - self.connection._ssh.sendline = mock.Mock() - self.connection._ssh.sendline.side_effect = exceptions.EOF('except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected) - self.assertIsNone(self.connection._ssh) - - def test_run_command_exception_pxssh(self): - """Testing exception in run_command.""" - from pexpect import pxssh - self.connection._ssh = mock.Mock() - self.connection._ssh.sendline = mock.Mock() - self.connection._ssh.sendline.side_effect = pxssh.ExceptionPxssh( - 'except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected) - self.assertIsNone(self.connection._ssh) - - def test_run_command_assertion_error(self): - """Testing exception in run_command.""" - self.connection._ssh = mock.Mock() - self.connection._ssh.sendline = mock.Mock() - self.connection._ssh.sendline.side_effect = AssertionError('except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected) - self.assertIsNone(self.connection._ssh) - - -@pytest.mark.skip( - reason="These tests are performing actual failing network calls. They " - "need to be cleaned up before they are re-enabled. They're frequently " - "failing in Travis.") -class TestTelnetConnection(unittest.TestCase): - """Testing TelnetConnection.""" - - def setUp(self): - """Set up test env.""" - self.connection = TelnetConnection( - 'fake', 'fake', 'fake', 'fake') - self.connection._connected = True - - def test_run_command_exception_eof(self): - """Testing EOFException in run_command.""" - self.connection._telnet = mock.Mock() - self.connection._telnet.write = mock.Mock() - self.connection._telnet.write.side_effect = EOFError('except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected) - - def test_run_command_exception_connection_refused(self): - """Testing ConnectionRefusedError in run_command.""" - self.connection._telnet = mock.Mock() - self.connection._telnet.write = mock.Mock() - self.connection._telnet.write.side_effect = ConnectionRefusedError( - 'except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected) - - def test_run_command_exception_gaierror(self): - """Testing socket.gaierror in run_command.""" - self.connection._telnet = mock.Mock() - self.connection._telnet.write = mock.Mock() - self.connection._telnet.write.side_effect = socket.gaierror('except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected) - - def test_run_command_exception_oserror(self): - """Testing OSError in run_command.""" - self.connection._telnet = mock.Mock() - self.connection._telnet.write = mock.Mock() - self.connection._telnet.write.side_effect = OSError('except') - self.connection.run_command('test') - self.assertFalse(self.connection._connected)