Voluptuous for AsusWRT (#2998)

* Voluptuous for AsusWRT
This commit is contained in:
Johann Kellerman 2016-08-27 22:30:06 +02:00 committed by GitHub
parent f863efdaca
commit 6f1c97b9d3
2 changed files with 100 additions and 54 deletions

View File

@ -12,14 +12,36 @@ import threading
from collections import namedtuple from collections import namedtuple
from datetime import timedelta from datetime import timedelta
from homeassistant.components.device_tracker import DOMAIN import voluptuous as vol
from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers import validate_config
from homeassistant.util import Throttle from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv
# Return cached results if last scan was less then this time ago. # Return cached results if last scan was less then this time ago.
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5) MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
CONF_PROTOCOL = 'protocol'
CONF_MODE = 'mode'
CONF_SSH_KEY = 'ssh_key'
CONF_PUB_KEY = 'pub_key'
PLATFORM_SCHEMA = vol.All(
cv.has_at_least_one_key(CONF_PASSWORD, CONF_PUB_KEY, CONF_SSH_KEY),
PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_PASSWORD): cv.string,
vol.Optional(CONF_PROTOCOL, default='ssh'):
vol.Schema(['ssh', 'telnet']),
vol.Optional(CONF_MODE, default='router'):
vol.Schema(['router', 'ap']),
vol.Optional(CONF_SSH_KEY): cv.isfile,
vol.Optional(CONF_PUB_KEY): cv.isfile
}))
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
REQUIREMENTS = ['pexpect==4.0.1'] REQUIREMENTS = ['pexpect==4.0.1']
@ -57,16 +79,6 @@ _IP_NEIGH_REGEX = re.compile(
# pylint: disable=unused-argument # pylint: disable=unused-argument
def get_scanner(hass, config): def get_scanner(hass, config):
"""Validate the configuration and return an ASUS-WRT scanner.""" """Validate the configuration and return an ASUS-WRT scanner."""
if not validate_config(config,
{DOMAIN: [CONF_HOST, CONF_USERNAME]},
_LOGGER):
return None
elif CONF_PASSWORD not in config[DOMAIN] and \
'ssh_key' not in config[DOMAIN] and \
'pub_key' not in config[DOMAIN]:
_LOGGER.error('Either a private key or password must be provided')
return None
scanner = AsusWrtDeviceScanner(config[DOMAIN]) scanner = AsusWrtDeviceScanner(config[DOMAIN])
return scanner if scanner.success_init else None return scanner if scanner.success_init else None
@ -83,11 +95,11 @@ class AsusWrtDeviceScanner(object):
def __init__(self, config): def __init__(self, config):
"""Initialize the scanner.""" """Initialize the scanner."""
self.host = config[CONF_HOST] self.host = config[CONF_HOST]
self.username = str(config[CONF_USERNAME]) self.username = config[CONF_USERNAME]
self.password = str(config.get(CONF_PASSWORD, '')) self.password = config.get(CONF_PASSWORD, '')
self.ssh_key = str(config.get('ssh_key', config.get('pub_key', ''))) self.ssh_key = config.get('ssh_key', config.get('pub_key', ''))
self.protocol = config.get('protocol') self.protocol = config[CONF_PROTOCOL]
self.mode = config.get('mode') self.mode = config[CONF_MODE]
self.lock = threading.Lock() self.lock = threading.Lock()

View File

@ -1,34 +1,56 @@
"""The tests for the ASUSWRT device tracker platform.""" """The tests for the ASUSWRT device tracker platform."""
import os import os
import unittest import unittest
from unittest import mock from unittest import mock
import voluptuous as vol
from homeassistant.bootstrap import _setup_component
from homeassistant.components import device_tracker from homeassistant.components import device_tracker
from homeassistant.components.device_tracker.asuswrt import (
CONF_PROTOCOL, CONF_MODE, CONF_PUB_KEY, PLATFORM_SCHEMA, DOMAIN)
from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME, from homeassistant.const import (CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME,
CONF_HOST) CONF_HOST)
from tests.common import get_test_home_assistant from tests.common import get_test_home_assistant, get_test_config_dir
FAKEFILE = None
def setup_module():
"""Setup the test module."""
global FAKEFILE
FAKEFILE = get_test_config_dir('fake_file')
with open(FAKEFILE, 'w') as out:
out.write(' ')
def teardown_module():
"""Tear down the module."""
os.remove(FAKEFILE)
class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase): class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
"""Tests for the ASUSWRT device tracker platform.""" """Tests for the ASUSWRT device tracker platform."""
hass = None
def setUp(self): # pylint: disable=invalid-name def setup_method(self, _):
"""Setup things to be run when tests are started.""" """Setup things to be run when tests are started."""
self.hass = get_test_home_assistant() self.hass = get_test_home_assistant()
self.hass.config.components = ['zone']
def tearDown(self): # pylint: disable=invalid-name def teardown_method(self, _):
"""Stop everything that was started.""" """Stop everything that was started."""
try: try:
os.remove(self.hass.config.path(device_tracker.YAML_DEVICES)) os.remove(self.hass.config.path(device_tracker.YAML_DEVICES))
except FileNotFoundError: except FileNotFoundError:
pass pass
def test_password_or_pub_key_required(self): def test_password_or_pub_key_required(self): \
# pylint: disable=invalid-name
"""Test creating an AsusWRT scanner without a pass or pubkey.""" """Test creating an AsusWRT scanner without a pass or pubkey."""
self.assertIsNone(device_tracker.asuswrt.get_scanner( self.assertFalse(_setup_component(
self.hass, {device_tracker.DOMAIN: { self.hass, DOMAIN, {DOMAIN: {
CONF_PLATFORM: 'asuswrt', CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host', CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user' CONF_USERNAME: 'fake_user'
@ -37,36 +59,42 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
@mock.patch( @mock.patch(
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner', 'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
return_value=mock.MagicMock()) return_value=mock.MagicMock())
def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): def test_get_scanner_with_password_no_pubkey(self, asuswrt_mock): \
# pylint: disable=invalid-name
"""Test creating an AsusWRT scanner with a password and no pubkey.""" """Test creating an AsusWRT scanner with a password and no pubkey."""
conf_dict = { conf_dict = {
device_tracker.DOMAIN: { DOMAIN: {
CONF_PLATFORM: 'asuswrt', CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host', CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user', CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass' CONF_PASSWORD: 'fake_pass'
} }
} }
self.assertIsNotNone(device_tracker.asuswrt.get_scanner( self.assertIsNotNone(_setup_component(self.hass, DOMAIN, conf_dict))
self.hass, conf_dict)) conf_dict[DOMAIN][CONF_MODE] = 'router'
asuswrt_mock.assert_called_once_with(conf_dict[device_tracker.DOMAIN]) conf_dict[DOMAIN][CONF_PROTOCOL] = 'ssh'
asuswrt_mock.assert_called_once_with(conf_dict[DOMAIN])
@mock.patch( @mock.patch(
'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner', 'homeassistant.components.device_tracker.asuswrt.AsusWrtDeviceScanner',
return_value=mock.MagicMock()) return_value=mock.MagicMock())
def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): def test_get_scanner_with_pubkey_no_password(self, asuswrt_mock): \
# pylint: disable=invalid-name
"""Test creating an AsusWRT scanner with a pubkey and no password.""" """Test creating an AsusWRT scanner with a pubkey and no password."""
conf_dict = { conf_dict = {
device_tracker.DOMAIN: { device_tracker.DOMAIN: {
CONF_PLATFORM: 'asuswrt', CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host', CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user', CONF_USERNAME: 'fake_user',
'pub_key': '/fake_path' CONF_PUB_KEY: FAKEFILE
} }
} }
self.assertIsNotNone(device_tracker.asuswrt.get_scanner(
self.hass, conf_dict)) self.assertIsNotNone(_setup_component(self.hass, DOMAIN, conf_dict))
asuswrt_mock.assert_called_once_with(conf_dict[device_tracker.DOMAIN])
conf_dict[DOMAIN][CONF_MODE] = 'router'
conf_dict[DOMAIN][CONF_PROTOCOL] = 'ssh'
asuswrt_mock.assert_called_once_with(conf_dict[DOMAIN])
def test_ssh_login_with_pub_key(self): def test_ssh_login_with_pub_key(self):
"""Test that login is done with pub_key when configured to.""" """Test that login is done with pub_key when configured to."""
@ -74,12 +102,12 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh) ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh)
ssh_mock.start() ssh_mock.start()
self.addCleanup(ssh_mock.stop) self.addCleanup(ssh_mock.stop)
conf_dict = { conf_dict = PLATFORM_SCHEMA({
CONF_PLATFORM: 'asuswrt', CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host', CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user', CONF_USERNAME: 'fake_user',
'pub_key': '/fake_path' CONF_PUB_KEY: FAKEFILE
} })
update_mock = mock.patch( update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.' 'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data') 'AsusWrtDeviceScanner.get_asuswrt_data')
@ -88,7 +116,7 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict) asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
asuswrt.ssh_connection() asuswrt.ssh_connection()
ssh.login.assert_called_once_with('fake_host', 'fake_user', ssh.login.assert_called_once_with('fake_host', 'fake_user',
ssh_key='/fake_path') ssh_key=FAKEFILE)
def test_ssh_login_with_password(self): def test_ssh_login_with_password(self):
"""Test that login is done with password when configured to.""" """Test that login is done with password when configured to."""
@ -96,12 +124,12 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh) ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh)
ssh_mock.start() ssh_mock.start()
self.addCleanup(ssh_mock.stop) self.addCleanup(ssh_mock.stop)
conf_dict = { conf_dict = PLATFORM_SCHEMA({
CONF_PLATFORM: 'asuswrt', CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host', CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user', CONF_USERNAME: 'fake_user',
CONF_PASSWORD: 'fake_pass' CONF_PASSWORD: 'fake_pass'
} })
update_mock = mock.patch( update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.' 'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data') 'AsusWrtDeviceScanner.get_asuswrt_data')
@ -112,23 +140,29 @@ class TestComponentsDeviceTrackerASUSWRT(unittest.TestCase):
ssh.login.assert_called_once_with('fake_host', 'fake_user', ssh.login.assert_called_once_with('fake_host', 'fake_user',
'fake_pass') 'fake_pass')
def test_ssh_login_without_password_or_pubkey(self): def test_ssh_login_without_password_or_pubkey(self): \
# pylint: disable=invalid-name
"""Test that login is not called without password or pub_key.""" """Test that login is not called without password or pub_key."""
ssh = mock.MagicMock() ssh = mock.MagicMock()
ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh) ssh_mock = mock.patch('pexpect.pxssh.pxssh', return_value=ssh)
ssh_mock.start() ssh_mock.start()
self.addCleanup(ssh_mock.stop) self.addCleanup(ssh_mock.stop)
conf_dict = { conf_dict = {
CONF_PLATFORM: 'asuswrt', CONF_PLATFORM: 'asuswrt',
CONF_HOST: 'fake_host', CONF_HOST: 'fake_host',
CONF_USERNAME: 'fake_user', CONF_USERNAME: 'fake_user',
} }
with self.assertRaises(vol.Invalid):
conf_dict = PLATFORM_SCHEMA(conf_dict)
update_mock = mock.patch( update_mock = mock.patch(
'homeassistant.components.device_tracker.asuswrt.' 'homeassistant.components.device_tracker.asuswrt.'
'AsusWrtDeviceScanner.get_asuswrt_data') 'AsusWrtDeviceScanner.get_asuswrt_data')
update_mock.start() update_mock.start()
self.addCleanup(update_mock.stop) self.addCleanup(update_mock.stop)
asuswrt = device_tracker.asuswrt.AsusWrtDeviceScanner(conf_dict)
result = asuswrt.ssh_connection() self.assertFalse(_setup_component(self.hass, DOMAIN,
{DOMAIN: conf_dict}))
ssh.login.assert_not_called() ssh.login.assert_not_called()
self.assertIsNone(result)