Cleanup old device_tracker stuff (#8627)

* Cleanup old device_tracker stuff

* Fix lint
This commit is contained in:
Pascal Vizeli 2017-07-24 16:45:02 +02:00 committed by Paulus Schoutsen
parent 654ad41464
commit f86bd15580
22 changed files with 478 additions and 679 deletions

View File

@ -7,9 +7,7 @@ https://home-assistant.io/components/device_tracker.actiontec/
import logging
import re
import telnetlib
import threading
from collections import namedtuple
from datetime import timedelta
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
@ -17,9 +15,6 @@ import homeassistant.util.dt as dt_util
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
@ -54,7 +49,6 @@ class ActiontecDeviceScanner(DeviceScanner):
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = []
data = self.get_actiontec_data()
self.success_init = data is not None
@ -74,7 +68,6 @@ class ActiontecDeviceScanner(DeviceScanner):
return client.ip
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the router is up to date.
@ -84,16 +77,15 @@ class ActiontecDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
now = dt_util.now()
actiontec_data = self.get_actiontec_data()
if not actiontec_data:
return False
self.last_results = [Device(data['mac'], name, now)
for name, data in actiontec_data.items()
if data['timevalid'] > -60]
_LOGGER.info("Scan successful")
return True
now = dt_util.now()
actiontec_data = self.get_actiontec_data()
if not actiontec_data:
return False
self.last_results = [Device(data['mac'], name, now)
for name, data in actiontec_data.items()
if data['timevalid'] > -60]
_LOGGER.info("Scan successful")
return True
def get_actiontec_data(self):
"""Retrieve data from Actiontec MI424WR and return parsed result."""

View File

@ -6,8 +6,6 @@ https://home-assistant.io/components/device_tracker.aruba/
"""
import logging
import re
import threading
from datetime import timedelta
import voluptuous as vol
@ -15,14 +13,11 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
REQUIREMENTS = ['pexpect==4.0.1']
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
_DEVICES_REGEX = re.compile(
r'(?P<name>([^\s]+))\s+' +
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s+' +
@ -52,8 +47,6 @@ class ArubaDeviceScanner(DeviceScanner):
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = {}
# Test the router is accessible.
@ -74,7 +67,6 @@ class ArubaDeviceScanner(DeviceScanner):
return client['name']
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the Aruba Access Point is up to date.
@ -83,13 +75,12 @@ class ArubaDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
data = self.get_aruba_data()
if not data:
return False
data = self.get_aruba_data()
if not data:
return False
self.last_results = data.values()
return True
self.last_results = data.values()
return True
def get_aruba_data(self):
"""Retrieve data from Aruba Access Point and return parsed result."""

View File

@ -8,9 +8,7 @@ import logging
import re
import socket
import telnetlib
import threading
from collections import namedtuple
from datetime import timedelta
import voluptuous as vol
@ -18,7 +16,6 @@ from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT)
from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['pexpect==4.0.1']
@ -32,8 +29,6 @@ CONF_SSH_KEY = 'ssh_key'
DEFAULT_SSH_PORT = 22
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
SECRET_GROUP = 'Password or SSH Key'
PLATFORM_SCHEMA = vol.All(
@ -123,8 +118,6 @@ class AsusWrtDeviceScanner(DeviceScanner):
self.password,
self.mode == "ap")
self.lock = threading.Lock()
self.last_results = {}
# Test the router is accessible.
@ -145,7 +138,6 @@ class AsusWrtDeviceScanner(DeviceScanner):
return client['host']
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the ASUSWRT router is up to date.
@ -154,19 +146,18 @@ class AsusWrtDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info('Checking Devices')
data = self.get_asuswrt_data()
if not data:
return False
_LOGGER.info('Checking Devices')
data = self.get_asuswrt_data()
if not data:
return False
active_clients = [client for client in data.values() if
client['status'] == 'REACHABLE' or
client['status'] == 'DELAY' or
client['status'] == 'STALE' or
client['status'] == 'IN_ASSOCLIST']
self.last_results = active_clients
return True
active_clients = [client for client in data.values() if
client['status'] == 'REACHABLE' or
client['status'] == 'DELAY' or
client['status'] == 'STALE' or
client['status'] == 'IN_ASSOCLIST']
self.last_results = active_clients
return True
def get_asuswrt_data(self):
"""Retrieve data from ASUSWRT and return parsed result."""

View File

@ -6,8 +6,6 @@ https://home-assistant.io/components/device_tracker.bt_home_hub_5/
"""
import logging
import re
import threading
from datetime import timedelta
import xml.etree.ElementTree as ET
import json
from urllib.parse import unquote
@ -19,13 +17,10 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
_MAC_REGEX = re.compile(r'(([0-9A-Fa-f]{1,2}\:){5}[0-9A-Fa-f]{1,2})')
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string
})
@ -46,11 +41,7 @@ class BTHomeHub5DeviceScanner(DeviceScanner):
"""Initialise the scanner."""
_LOGGER.info("Initialising BT Home Hub 5")
self.host = config.get(CONF_HOST, '192.168.1.254')
self.lock = threading.Lock()
self.last_results = {}
self.url = 'http://{}/nonAuth/home_status.xml'.format(self.host)
# Test the router is accessible
@ -65,17 +56,15 @@ class BTHomeHub5DeviceScanner(DeviceScanner):
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
with self.lock:
# If not initialised and not already scanned and not found.
if device not in self.last_results:
self._update_info()
# If not initialised and not already scanned and not found.
if device not in self.last_results:
self._update_info()
if not self.last_results:
return None
if not self.last_results:
return None
return self.last_results.get(device)
return self.last_results.get(device)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the BT Home Hub 5 is up to date.
@ -84,18 +73,17 @@ class BTHomeHub5DeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Scanning")
_LOGGER.info("Scanning")
data = _get_homehub_data(self.url)
data = _get_homehub_data(self.url)
if not data:
_LOGGER.warning("Error scanning devices")
return False
if not data:
_LOGGER.warning("Error scanning devices")
return False
self.last_results = data
self.last_results = data
return True
return True
def _get_homehub_data(url):

View File

@ -5,7 +5,6 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.cisco_ios/
"""
import logging
from datetime import timedelta
import voluptuous as vol
@ -14,9 +13,6 @@ from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, \
CONF_PORT
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
@ -65,7 +61,6 @@ class CiscoDeviceScanner(DeviceScanner):
return self.last_results
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""
Ensure the information from the Cisco router is up to date.

View File

@ -6,8 +6,6 @@ https://home-assistant.io/components/device_tracker.ddwrt/
"""
import logging
import re
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -16,9 +14,6 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
@ -50,8 +45,6 @@ class DdWrtDeviceScanner(DeviceScanner):
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = {}
self.mac2name = {}
@ -69,68 +62,65 @@ class DdWrtDeviceScanner(DeviceScanner):
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
with self.lock:
# If not initialised and not already scanned and not found.
if device not in self.mac2name:
url = 'http://{}/Status_Lan.live.asp'.format(self.host)
data = self.get_ddwrt_data(url)
# If not initialised and not already scanned and not found.
if device not in self.mac2name:
url = 'http://{}/Status_Lan.live.asp'.format(self.host)
data = self.get_ddwrt_data(url)
if not data:
return None
if not data:
return None
dhcp_leases = data.get('dhcp_leases', None)
dhcp_leases = data.get('dhcp_leases', None)
if not dhcp_leases:
return None
if not dhcp_leases:
return None
# Remove leading and trailing quotes and spaces
cleaned_str = dhcp_leases.replace(
"\"", "").replace("\'", "").replace(" ", "")
elements = cleaned_str.split(',')
num_clients = int(len(elements) / 5)
self.mac2name = {}
for idx in range(0, num_clients):
# The data is a single array
# every 5 elements represents one host, the MAC
# is the third element and the name is the first.
mac_index = (idx * 5) + 2
if mac_index < len(elements):
mac = elements[mac_index]
self.mac2name[mac] = elements[idx * 5]
# Remove leading and trailing quotes and spaces
cleaned_str = dhcp_leases.replace(
"\"", "").replace("\'", "").replace(" ", "")
elements = cleaned_str.split(',')
num_clients = int(len(elements) / 5)
self.mac2name = {}
for idx in range(0, num_clients):
# The data is a single array
# every 5 elements represents one host, the MAC
# is the third element and the name is the first.
mac_index = (idx * 5) + 2
if mac_index < len(elements):
mac = elements[mac_index]
self.mac2name[mac] = elements[idx * 5]
return self.mac2name.get(device)
return self.mac2name.get(device)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the DD-WRT router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
_LOGGER.info("Checking ARP")
_LOGGER.info("Checking ARP")
url = 'http://{}/Status_Wireless.live.asp'.format(self.host)
data = self.get_ddwrt_data(url)
url = 'http://{}/Status_Wireless.live.asp'.format(self.host)
data = self.get_ddwrt_data(url)
if not data:
return False
if not data:
return False
self.last_results = []
self.last_results = []
active_clients = data.get('active_wireless', None)
if not active_clients:
return False
active_clients = data.get('active_wireless', None)
if not active_clients:
return False
# The DD-WRT UI uses its own data format and then
# regex's out values so this is done here too
# Remove leading and trailing single quotes.
clean_str = active_clients.strip().strip("'")
elements = clean_str.split("','")
# The DD-WRT UI uses its own data format and then
# regex's out values so this is done here too
# Remove leading and trailing single quotes.
clean_str = active_clients.strip().strip("'")
elements = clean_str.split("','")
self.last_results.extend(item for item in elements
if _MAC_REGEX.match(item))
self.last_results.extend(item for item in elements
if _MAC_REGEX.match(item))
return True
return True
def get_ddwrt_data(self, url):
"""Retrieve data from DD-WRT and return parsed result."""

View File

@ -5,7 +5,6 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.fritz/
"""
import logging
from datetime import timedelta
import voluptuous as vol
@ -13,12 +12,9 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
REQUIREMENTS = ['fritzconnection==0.6.3']
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
CONF_DEFAULT_IP = '169.254.1.1' # This IP is valid for all FRITZ!Box routers.
@ -88,7 +84,6 @@ class FritzBoxScanner(DeviceScanner):
return None
return ret
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Retrieve latest information from the FRITZ!Box."""
if not self.success_init:

View File

@ -6,8 +6,6 @@ https://home-assistant.io/components/device_tracker.linksys_ap/
"""
import base64
import logging
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -16,9 +14,7 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import DOMAIN, PLATFORM_SCHEMA
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_VERIFY_SSL)
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
INTERFACES = 2
DEFAULT_TIMEOUT = 10
@ -51,8 +47,6 @@ class LinksysAPDeviceScanner(object):
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.verify_ssl = config[CONF_VERIFY_SSL]
self.lock = threading.Lock()
self.last_results = []
# Check if the access point is accessible
@ -76,24 +70,22 @@ class LinksysAPDeviceScanner(object):
"""
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Check for connected devices."""
from bs4 import BeautifulSoup as BS
with self.lock:
_LOGGER.info("Checking Linksys AP")
_LOGGER.info("Checking Linksys AP")
self.last_results = []
for interface in range(INTERFACES):
request = self._make_request(interface)
self.last_results.extend(
[x.find_all('td')[1].text
for x in BS(request.content, "html.parser")
.find_all(class_='section-row')]
)
self.last_results = []
for interface in range(INTERFACES):
request = self._make_request(interface)
self.last_results.extend(
[x.find_all('td')[1].text
for x in BS(request.content, "html.parser")
.find_all(class_='section-row')]
)
return True
return True
def _make_request(self, unit=0):
# No, the '&&' is not a typo - this is expected by the web interface.

View File

@ -1,7 +1,5 @@
"""Support for Linksys Smart Wifi routers."""
import logging
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -10,9 +8,7 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
DEFAULT_TIMEOUT = 10
_LOGGER = logging.getLogger(__name__)
@ -36,8 +32,6 @@ class LinksysSmartWifiDeviceScanner(DeviceScanner):
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.lock = threading.Lock()
self.last_results = {}
# Check if the access point is accessible
@ -55,48 +49,46 @@ class LinksysSmartWifiDeviceScanner(DeviceScanner):
"""Return the name (if known) of the device."""
return self.last_results.get(mac)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Check for connected devices."""
with self.lock:
_LOGGER.info("Checking Linksys Smart Wifi")
_LOGGER.info("Checking Linksys Smart Wifi")
self.last_results = {}
response = self._make_request()
if response.status_code != 200:
_LOGGER.error(
"Got HTTP status code %d when getting device list",
response.status_code)
return False
try:
data = response.json()
result = data["responses"][0]
devices = result["output"]["devices"]
for device in devices:
macs = device["knownMACAddresses"]
if not macs:
_LOGGER.warning(
"Skipping device without known MAC address")
continue
mac = macs[-1]
connections = device["connections"]
if not connections:
_LOGGER.debug("Device %s is not connected", mac)
continue
self.last_results = {}
response = self._make_request()
if response.status_code != 200:
_LOGGER.error(
"Got HTTP status code %d when getting device list",
response.status_code)
return False
try:
data = response.json()
result = data["responses"][0]
devices = result["output"]["devices"]
for device in devices:
macs = device["knownMACAddresses"]
if not macs:
_LOGGER.warning(
"Skipping device without known MAC address")
continue
mac = macs[-1]
connections = device["connections"]
if not connections:
_LOGGER.debug("Device %s is not connected", mac)
continue
name = None
for prop in device["properties"]:
if prop["name"] == "userDeviceName":
name = prop["value"]
if not name:
name = device.get("friendlyName", device["deviceID"])
name = None
for prop in device["properties"]:
if prop["name"] == "userDeviceName":
name = prop["value"]
if not name:
name = device.get("friendlyName", device["deviceID"])
_LOGGER.debug("Device %s is connected", mac)
self.last_results[mac] = name
except (KeyError, IndexError):
_LOGGER.exception("Router returned unexpected response")
return False
return True
_LOGGER.debug("Device %s is connected", mac)
self.last_results[mac] = name
except (KeyError, IndexError):
_LOGGER.exception("Router returned unexpected response")
return False
return True
def _make_request(self):
# Weirdly enough, this doesn't seem to require authentication

View File

@ -7,8 +7,6 @@ https://home-assistant.io/components/device_tracker.luci/
import json
import logging
import re
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -18,9 +16,6 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
@ -55,12 +50,8 @@ class LuciDeviceScanner(DeviceScanner):
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
self.lock = threading.Lock()
self.last_results = {}
self.refresh_token()
self.mac2name = None
self.success_init = self.token is not None
@ -75,24 +66,22 @@ class LuciDeviceScanner(DeviceScanner):
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
with self.lock:
if self.mac2name is None:
url = 'http://{}/cgi-bin/luci/rpc/uci'.format(self.host)
result = _req_json_rpc(url, 'get_all', 'dhcp',
params={'auth': self.token})
if result:
hosts = [x for x in result.values()
if x['.type'] == 'host' and
'mac' in x and 'name' in x]
mac2name_list = [
(x['mac'].upper(), x['name']) for x in hosts]
self.mac2name = dict(mac2name_list)
else:
# Error, handled in the _req_json_rpc
return
return self.mac2name.get(device.upper(), None)
if self.mac2name is None:
url = 'http://{}/cgi-bin/luci/rpc/uci'.format(self.host)
result = _req_json_rpc(url, 'get_all', 'dhcp',
params={'auth': self.token})
if result:
hosts = [x for x in result.values()
if x['.type'] == 'host' and
'mac' in x and 'name' in x]
mac2name_list = [
(x['mac'].upper(), x['name']) for x in hosts]
self.mac2name = dict(mac2name_list)
else:
# Error, handled in the _req_json_rpc
return
return self.mac2name.get(device.upper(), None)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the Luci router is up to date.
@ -101,31 +90,30 @@ class LuciDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Checking ARP")
_LOGGER.info("Checking ARP")
url = 'http://{}/cgi-bin/luci/rpc/sys'.format(self.host)
try:
result = _req_json_rpc(url, 'net.arptable',
params={'auth': self.token})
except InvalidLuciTokenError:
_LOGGER.info("Refreshing token")
self.refresh_token()
return False
if result:
self.last_results = []
for device_entry in result:
# Check if the Flags for each device contain
# NUD_REACHABLE and if so, add it to last_results
if int(device_entry['Flags'], 16) & 0x2:
self.last_results.append(device_entry['HW address'])
return True
url = 'http://{}/cgi-bin/luci/rpc/sys'.format(self.host)
try:
result = _req_json_rpc(url, 'net.arptable',
params={'auth': self.token})
except InvalidLuciTokenError:
_LOGGER.info("Refreshing token")
self.refresh_token()
return False
if result:
self.last_results = []
for device_entry in result:
# Check if the Flags for each device contain
# NUD_REACHABLE and if so, add it to last_results
if int(device_entry['Flags'], 16) & 0x2:
self.last_results.append(device_entry['HW address'])
return True
return False
def _req_json_rpc(url, method, *args, **kwargs):
"""Perform one JSON RPC operation."""

View File

@ -5,25 +5,17 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.mikrotik/
"""
import logging
import threading
from datetime import timedelta
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (CONF_HOST,
CONF_PASSWORD,
CONF_USERNAME,
CONF_PORT)
from homeassistant.util import Throttle
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT)
REQUIREMENTS = ['librouteros==1.0.2']
# Return cached results if last scan was less then this time ago.
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
MTK_DEFAULT_API_PORT = '8728'
_LOGGER = logging.getLogger(__name__)
@ -54,12 +46,9 @@ class MikrotikScanner(DeviceScanner):
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.connected = False
self.success_init = False
self.client = None
self.wireless_exist = None
self.success_init = self.connect_to_device()
@ -118,51 +107,48 @@ class MikrotikScanner(DeviceScanner):
def get_device_name(self, mac):
"""Return the name of the given device or None if we don't know."""
with self.lock:
return self.last_results.get(mac)
return self.last_results.get(mac)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Retrieve latest information from the Mikrotik box."""
with self.lock:
if self.wireless_exist:
devices_tracker = 'wireless'
else:
devices_tracker = 'ip'
if self.wireless_exist:
devices_tracker = 'wireless'
else:
devices_tracker = 'ip'
_LOGGER.info(
"Loading %s devices from Mikrotik (%s) ...",
devices_tracker,
self.host
_LOGGER.info(
"Loading %s devices from Mikrotik (%s) ...",
devices_tracker,
self.host
)
device_names = self.client(cmd='/ip/dhcp-server/lease/getall')
if self.wireless_exist:
devices = self.client(
cmd='/interface/wireless/registration-table/getall'
)
else:
devices = device_names
device_names = self.client(cmd='/ip/dhcp-server/lease/getall')
if self.wireless_exist:
devices = self.client(
cmd='/interface/wireless/registration-table/getall'
)
else:
devices = device_names
if device_names is None and devices is None:
return False
if device_names is None and devices is None:
return False
mac_names = {device.get('mac-address'): device.get('host-name')
for device in device_names
if device.get('mac-address')}
mac_names = {device.get('mac-address'): device.get('host-name')
for device in device_names
if device.get('mac-address')}
if self.wireless_exist:
self.last_results = {
device.get('mac-address'):
mac_names.get(device.get('mac-address'))
for device in devices
}
else:
self.last_results = {
device.get('mac-address'):
mac_names.get(device.get('mac-address'))
for device in device_names
if device.get('active-address')
}
if self.wireless_exist:
self.last_results = {
device.get('mac-address'):
mac_names.get(device.get('mac-address'))
for device in devices
}
else:
self.last_results = {
device.get('mac-address'):
mac_names.get(device.get('mac-address'))
for device in device_names
if device.get('active-address')
}
return True
return True

View File

@ -5,8 +5,6 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.netgear/
"""
import logging
import threading
from datetime import timedelta
import voluptuous as vol
@ -15,14 +13,11 @@ from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME, CONF_PORT)
from homeassistant.util import Throttle
REQUIREMENTS = ['pynetgear==0.3.3']
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
DEFAULT_HOST = 'routerlogin.net'
DEFAULT_USER = 'admin'
DEFAULT_PORT = 5000
@ -56,8 +51,6 @@ class NetgearDeviceScanner(DeviceScanner):
import pynetgear
self.last_results = []
self.lock = threading.Lock()
self._api = pynetgear.Netgear(password, host, username, port)
_LOGGER.info("Logging in")
@ -85,7 +78,6 @@ class NetgearDeviceScanner(DeviceScanner):
except StopIteration:
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Retrieve latest information from the Netgear router.
@ -94,12 +86,11 @@ class NetgearDeviceScanner(DeviceScanner):
if not self.success_init:
return
with self.lock:
_LOGGER.info("Scanning")
_LOGGER.info("Scanning")
results = self._api.get_attached_devices()
results = self._api.get_attached_devices()
if results is None:
_LOGGER.warning("Error scanning devices")
if results is None:
_LOGGER.warning("Error scanning devices")
self.last_results = results or []
self.last_results = results or []

View File

@ -4,11 +4,11 @@ Support for scanning a network with nmap.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.nmap_tracker/
"""
from datetime import timedelta
import logging
import re
import subprocess
from collections import namedtuple
from datetime import timedelta
import voluptuous as vol
@ -17,7 +17,6 @@ import homeassistant.util.dt as dt_util
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOSTS
from homeassistant.util import Throttle
REQUIREMENTS = ['python-nmap==0.6.1']
@ -29,8 +28,6 @@ CONF_HOME_INTERVAL = 'home_interval'
CONF_OPTIONS = 'scan_options'
DEFAULT_OPTIONS = '-F --host-timeout 5s'
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOSTS): cv.ensure_list,
@ -97,7 +94,6 @@ class NmapDeviceScanner(DeviceScanner):
return filter_named[0]
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Scan the network for devices.

View File

@ -6,8 +6,6 @@ https://home-assistant.io/components/device_tracker.sky_hub/
"""
import logging
import re
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -16,13 +14,10 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
_MAC_REGEX = re.compile(r'(([0-9A-Fa-f]{1,2}\:){5}[0-9A-Fa-f]{1,2})')
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string
})
@ -43,11 +38,7 @@ class SkyHubDeviceScanner(DeviceScanner):
"""Initialise the scanner."""
_LOGGER.info("Initialising Sky Hub")
self.host = config.get(CONF_HOST, '192.168.1.254')
self.lock = threading.Lock()
self.last_results = {}
self.url = 'http://{}/'.format(self.host)
# Test the router is accessible
@ -62,17 +53,15 @@ class SkyHubDeviceScanner(DeviceScanner):
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
with self.lock:
# If not initialised and not already scanned and not found.
if device not in self.last_results:
self._update_info()
# If not initialised and not already scanned and not found.
if device not in self.last_results:
self._update_info()
if not self.last_results:
return None
if not self.last_results:
return None
return self.last_results.get(device)
return self.last_results.get(device)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the Sky Hub is up to date.
@ -81,18 +70,17 @@ class SkyHubDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Scanning")
_LOGGER.info("Scanning")
data = _get_skyhub_data(self.url)
data = _get_skyhub_data(self.url)
if not data:
_LOGGER.warning('Error scanning devices')
return False
if not data:
_LOGGER.warning('Error scanning devices')
return False
self.last_results = data
self.last_results = data
return True
return True
def _get_skyhub_data(url):

View File

@ -6,8 +6,6 @@ https://home-assistant.io/components/device_tracker.snmp/
"""
import binascii
import logging
import threading
from datetime import timedelta
import voluptuous as vol
@ -15,7 +13,6 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
@ -28,8 +25,6 @@ CONF_BASEOID = 'baseoid'
DEFAULT_COMMUNITY = 'public'
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_COMMUNITY, default=DEFAULT_COMMUNITY): cv.string,
@ -68,9 +63,6 @@ class SnmpScanner(DeviceScanner):
privProtocol=cfg.usmAesCfb128Protocol
)
self.baseoid = cmdgen.MibVariable(config[CONF_BASEOID])
self.lock = threading.Lock()
self.last_results = []
# Test the router is accessible
@ -90,7 +82,6 @@ class SnmpScanner(DeviceScanner):
# We have no names
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the device is up to date.
@ -99,13 +90,12 @@ class SnmpScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
data = self.get_snmp_data()
if not data:
return False
data = self.get_snmp_data()
if not data:
return False
self.last_results = data
return True
self.last_results = data
return True
def get_snmp_data(self):
"""Fetch MAC addresses from access point via SNMP."""

View File

@ -5,8 +5,6 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.swisscom/
"""
import logging
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -15,9 +13,6 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
@ -41,9 +36,6 @@ class SwisscomDeviceScanner(DeviceScanner):
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.lock = threading.Lock()
self.last_results = {}
# Test the router is accessible.
@ -64,7 +56,6 @@ class SwisscomDeviceScanner(DeviceScanner):
return client['host']
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the Swisscom router is up to date.
@ -73,16 +64,15 @@ class SwisscomDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Loading data from Swisscom Internet Box")
data = self.get_swisscom_data()
if not data:
return False
_LOGGER.info("Loading data from Swisscom Internet Box")
data = self.get_swisscom_data()
if not data:
return False
active_clients = [client for client in data.values() if
client['status']]
self.last_results = active_clients
return True
active_clients = [client for client in data.values() if
client['status']]
self.last_results = active_clients
return True
def get_swisscom_data(self):
"""Retrieve data from Swisscom and return parsed result."""

View File

@ -7,8 +7,6 @@ https://home-assistant.io/components/device_tracker.thomson/
import logging
import re
import telnetlib
import threading
from datetime import timedelta
import voluptuous as vol
@ -16,9 +14,6 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
_LOGGER = logging.getLogger(__name__)
@ -54,9 +49,6 @@ class ThomsonDeviceScanner(DeviceScanner):
self.host = config[CONF_HOST]
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = {}
# Test the router is accessible.
@ -77,7 +69,6 @@ class ThomsonDeviceScanner(DeviceScanner):
return client['host']
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the THOMSON router is up to date.
@ -86,17 +77,16 @@ class ThomsonDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Checking ARP")
data = self.get_thomson_data()
if not data:
return False
_LOGGER.info("Checking ARP")
data = self.get_thomson_data()
if not data:
return False
# Flag C stands for CONNECTED
active_clients = [client for client in data.values() if
client['status'].find('C') != -1]
self.last_results = active_clients
return True
# Flag C stands for CONNECTED
active_clients = [client for client in data.values() if
client['status'].find('C') != -1]
self.last_results = active_clients
return True
def get_thomson_data(self):
"""Retrieve data from THOMSON and return parsed result."""

View File

@ -7,8 +7,6 @@ https://home-assistant.io/components/device_tracker.tomato/
import json
import logging
import re
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -17,9 +15,6 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
CONF_HTTP_ID = 'http_id'
@ -54,8 +49,6 @@ class TomatoDeviceScanner(DeviceScanner):
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
self.logger = logging.getLogger("{}.{}".format(__name__, "Tomato"))
self.lock = threading.Lock()
self.last_results = {"wldev": [], "dhcpd_lease": []}
self.success_init = self._update_tomato_info()
@ -76,50 +69,48 @@ class TomatoDeviceScanner(DeviceScanner):
return filter_named[0]
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_tomato_info(self):
"""Ensure the information from the Tomato router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
self.logger.info("Scanning")
self.logger.info("Scanning")
try:
response = requests.Session().send(self.req, timeout=3)
# Calling and parsing the Tomato api here. We only need the
# wldev and dhcpd_lease values.
if response.status_code == 200:
try:
response = requests.Session().send(self.req, timeout=3)
# Calling and parsing the Tomato api here. We only need the
# wldev and dhcpd_lease values.
if response.status_code == 200:
for param, value in \
self.parse_api_pattern.findall(response.text):
for param, value in \
self.parse_api_pattern.findall(response.text):
if param == 'wldev' or param == 'dhcpd_lease':
self.last_results[param] = \
json.loads(value.replace("'", '"'))
return True
if param == 'wldev' or param == 'dhcpd_lease':
self.last_results[param] = \
json.loads(value.replace("'", '"'))
return True
elif response.status_code == 401:
# Authentication error
self.logger.exception((
"Failed to authenticate, "
"please check your username and password"))
return False
except requests.exceptions.ConnectionError:
# We get this if we could not connect to the router or
# an invalid http_id was supplied.
self.logger.exception("Failed to connect to the router or "
"invalid http_id supplied")
elif response.status_code == 401:
# Authentication error
self.logger.exception((
"Failed to authenticate, "
"please check your username and password"))
return False
except requests.exceptions.Timeout:
# We get this if we could not connect to the router or
# an invalid http_id was supplied.
self.logger.exception("Connection to the router timed out")
return False
except requests.exceptions.ConnectionError:
# We get this if we could not connect to the router or
# an invalid http_id was supplied.
self.logger.exception("Failed to connect to the router or "
"invalid http_id supplied")
return False
except ValueError:
# If JSON decoder could not parse the response.
self.logger.exception("Failed to parse response from router")
return False
except requests.exceptions.Timeout:
# We get this if we could not connect to the router or
# an invalid http_id was supplied.
self.logger.exception("Connection to the router timed out")
return False
except ValueError:
# If JSON decoder could not parse the response.
self.logger.exception("Failed to parse response from router")
return False

View File

@ -8,8 +8,7 @@ import base64
import hashlib
import logging
import re
import threading
from datetime import timedelta, datetime
from datetime import datetime
import requests
import voluptuous as vol
@ -18,9 +17,6 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
@ -59,7 +55,6 @@ class TplinkDeviceScanner(DeviceScanner):
self.password = password
self.last_results = {}
self.lock = threading.Lock()
self.success_init = self._update_info()
def scan_devices(self):
@ -72,28 +67,26 @@ class TplinkDeviceScanner(DeviceScanner):
"""Get firmware doesn't save the name of the wireless device."""
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
_LOGGER.info("Loading wireless clients...")
_LOGGER.info("Loading wireless clients...")
url = 'http://{}/userRpm/WlanStationRpm.htm'.format(self.host)
referer = 'http://{}'.format(self.host)
page = requests.get(
url, auth=(self.username, self.password),
headers={'referer': referer}, timeout=4)
url = 'http://{}/userRpm/WlanStationRpm.htm'.format(self.host)
referer = 'http://{}'.format(self.host)
page = requests.get(
url, auth=(self.username, self.password),
headers={'referer': referer}, timeout=4)
result = self.parse_macs.findall(page.text)
result = self.parse_macs.findall(page.text)
if result:
self.last_results = [mac.replace("-", ":") for mac in result]
return True
if result:
self.last_results = [mac.replace("-", ":") for mac in result]
return True
return False
return False
class Tplink2DeviceScanner(TplinkDeviceScanner):
@ -109,48 +102,46 @@ class Tplink2DeviceScanner(TplinkDeviceScanner):
"""Get firmware doesn't save the name of the wireless device."""
return self.last_results.get(device)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
_LOGGER.info("Loading wireless clients...")
_LOGGER.info("Loading wireless clients...")
url = 'http://{}/data/map_access_wireless_client_grid.json' \
.format(self.host)
referer = 'http://{}'.format(self.host)
url = 'http://{}/data/map_access_wireless_client_grid.json' \
.format(self.host)
referer = 'http://{}'.format(self.host)
# Router uses Authorization cookie instead of header
# Let's create the cookie
username_password = '{}:{}'.format(self.username, self.password)
b64_encoded_username_password = base64.b64encode(
username_password.encode('ascii')
).decode('ascii')
cookie = 'Authorization=Basic {}' \
.format(b64_encoded_username_password)
# Router uses Authorization cookie instead of header
# Let's create the cookie
username_password = '{}:{}'.format(self.username, self.password)
b64_encoded_username_password = base64.b64encode(
username_password.encode('ascii')
).decode('ascii')
cookie = 'Authorization=Basic {}' \
.format(b64_encoded_username_password)
response = requests.post(
url, headers={'referer': referer, 'cookie': cookie},
timeout=4)
try:
result = response.json().get('data')
except ValueError:
_LOGGER.error("Router didn't respond with JSON. "
"Check if credentials are correct.")
return False
if result:
self.last_results = {
device['mac_addr'].replace('-', ':'): device['name']
for device in result
}
return True
response = requests.post(
url, headers={'referer': referer, 'cookie': cookie},
timeout=4)
try:
result = response.json().get('data')
except ValueError:
_LOGGER.error("Router didn't respond with JSON. "
"Check if credentials are correct.")
return False
if result:
self.last_results = {
device['mac_addr'].replace('-', ':'): device['name']
for device in result
}
return True
return False
class Tplink3DeviceScanner(TplinkDeviceScanner):
"""This class queries the Archer C9 router with version 150811 or high."""
@ -202,70 +193,67 @@ class Tplink3DeviceScanner(TplinkDeviceScanner):
response.text)
return False
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
if (self.stok == '') or (self.sysauth == ''):
self._get_auth_tokens()
if (self.stok == '') or (self.sysauth == ''):
self._get_auth_tokens()
_LOGGER.info("Loading wireless clients...")
_LOGGER.info("Loading wireless clients...")
url = ('http://{}/cgi-bin/luci/;stok={}/admin/wireless?'
'form=statistics').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
url = ('http://{}/cgi-bin/luci/;stok={}/admin/wireless?'
'form=statistics').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
response = requests.post(url,
params={'operation': 'load'},
headers={'referer': referer},
cookies={'sysauth': self.sysauth},
timeout=5)
response = requests.post(url,
params={'operation': 'load'},
headers={'referer': referer},
cookies={'sysauth': self.sysauth},
timeout=5)
try:
json_response = response.json()
try:
json_response = response.json()
if json_response.get('success'):
result = response.json().get('data')
else:
if json_response.get('errorcode') == 'timeout':
_LOGGER.info("Token timed out. Relogging on next scan")
self.stok = ''
self.sysauth = ''
return False
_LOGGER.error(
"An unknown error happened while fetching data")
if json_response.get('success'):
result = response.json().get('data')
else:
if json_response.get('errorcode') == 'timeout':
_LOGGER.info("Token timed out. Relogging on next scan")
self.stok = ''
self.sysauth = ''
return False
except ValueError:
_LOGGER.error("Router didn't respond with JSON. "
"Check if credentials are correct")
_LOGGER.error(
"An unknown error happened while fetching data")
return False
if result:
self.last_results = {
device['mac'].replace('-', ':'): device['mac']
for device in result
}
return True
except ValueError:
_LOGGER.error("Router didn't respond with JSON. "
"Check if credentials are correct")
return False
if result:
self.last_results = {
device['mac'].replace('-', ':'): device['mac']
for device in result
}
return True
return False
def _log_out(self):
with self.lock:
_LOGGER.info("Logging out of router admin interface...")
_LOGGER.info("Logging out of router admin interface...")
url = ('http://{}/cgi-bin/luci/;stok={}/admin/system?'
'form=logout').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
url = ('http://{}/cgi-bin/luci/;stok={}/admin/system?'
'form=logout').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
requests.post(url,
params={'operation': 'write'},
headers={'referer': referer},
cookies={'sysauth': self.sysauth})
self.stok = ''
self.sysauth = ''
requests.post(url,
params={'operation': 'write'},
headers={'referer': referer},
cookies={'sysauth': self.sysauth})
self.stok = ''
self.sysauth = ''
class Tplink4DeviceScanner(TplinkDeviceScanner):
@ -318,38 +306,36 @@ class Tplink4DeviceScanner(TplinkDeviceScanner):
_LOGGER.error("Couldn't fetch auth tokens")
return False
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the TP-Link router is up to date.
Return boolean if scanning successful.
"""
with self.lock:
if (self.credentials == '') or (self.token == ''):
self._get_auth_tokens()
if (self.credentials == '') or (self.token == ''):
self._get_auth_tokens()
_LOGGER.info("Loading wireless clients...")
_LOGGER.info("Loading wireless clients...")
mac_results = []
mac_results = []
# Check both the 2.4GHz and 5GHz client list URLs
for clients_url in ('WlanStationRpm.htm', 'WlanStationRpm_5g.htm'):
url = 'http://{}/{}/userRpm/{}' \
.format(self.host, self.token, clients_url)
referer = 'http://{}'.format(self.host)
cookie = 'Authorization=Basic {}'.format(self.credentials)
# Check both the 2.4GHz and 5GHz client list URLs
for clients_url in ('WlanStationRpm.htm', 'WlanStationRpm_5g.htm'):
url = 'http://{}/{}/userRpm/{}' \
.format(self.host, self.token, clients_url)
referer = 'http://{}'.format(self.host)
cookie = 'Authorization=Basic {}'.format(self.credentials)
page = requests.get(url, headers={
'cookie': cookie,
'referer': referer
})
mac_results.extend(self.parse_macs.findall(page.text))
page = requests.get(url, headers={
'cookie': cookie,
'referer': referer
})
mac_results.extend(self.parse_macs.findall(page.text))
if not mac_results:
return False
if not mac_results:
return False
self.last_results = [mac.replace("-", ":") for mac in mac_results]
return True
self.last_results = [mac.replace("-", ":") for mac in mac_results]
return True
class Tplink5DeviceScanner(TplinkDeviceScanner):
@ -365,69 +351,67 @@ class Tplink5DeviceScanner(TplinkDeviceScanner):
"""Get firmware doesn't save the name of the wireless device."""
return None
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the TP-Link AP is up to date.
Return boolean if scanning successful.
"""
with self.lock:
_LOGGER.info("Loading wireless clients...")
_LOGGER.info("Loading wireless clients...")
base_url = 'http://{}'.format(self.host)
base_url = 'http://{}'.format(self.host)
header = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12;"
" rv:53.0) Gecko/20100101 Firefox/53.0",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "Accept-Language: en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded; "
"charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Referer": "http://" + self.host + "/",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
}
header = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12;"
" rv:53.0) Gecko/20100101 Firefox/53.0",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "Accept-Language: en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded; "
"charset=UTF-8",
"X-Requested-With": "XMLHttpRequest",
"Referer": "http://" + self.host + "/",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
}
password_md5 = hashlib.md5(
self.password.encode('utf')).hexdigest().upper()
password_md5 = hashlib.md5(
self.password.encode('utf')).hexdigest().upper()
# create a session to handle cookie easier
session = requests.session()
session.get(base_url, headers=header)
# create a session to handle cookie easier
session = requests.session()
session.get(base_url, headers=header)
login_data = {"username": self.username, "password": password_md5}
session.post(base_url, login_data, headers=header)
login_data = {"username": self.username, "password": password_md5}
session.post(base_url, login_data, headers=header)
# a timestamp is required to be sent as get parameter
timestamp = int(datetime.now().timestamp() * 1e3)
# a timestamp is required to be sent as get parameter
timestamp = int(datetime.now().timestamp() * 1e3)
client_list_url = '{}/data/monitor.client.client.json'.format(
base_url)
client_list_url = '{}/data/monitor.client.client.json'.format(
base_url)
get_params = {
'operation': 'load',
'_': timestamp
}
response = session.get(client_list_url,
headers=header,
params=get_params)
session.close()
try:
list_of_devices = response.json()
except ValueError:
_LOGGER.error("AP didn't respond with JSON. "
"Check if credentials are correct.")
return False
if list_of_devices:
self.last_results = {
device['MAC'].replace('-', ':'): device['DeviceName']
for device in list_of_devices['data']
}
return True
get_params = {
'operation': 'load',
'_': timestamp
}
response = session.get(client_list_url,
headers=header,
params=get_params)
session.close()
try:
list_of_devices = response.json()
except ValueError:
_LOGGER.error("AP didn't respond with JSON. "
"Check if credentials are correct.")
return False
if list_of_devices:
self.last_results = {
device['MAC'].replace('-', ':'): device['DeviceName']
for device in list_of_devices['data']
}
return True
return False

View File

@ -7,8 +7,6 @@ https://home-assistant.io/components/device_tracker.ubus/
import json
import logging
import re
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -17,12 +15,8 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
from homeassistant.exceptions import HomeAssistantError
# Return cached results if last scan was less then this time ago.
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
@ -70,7 +64,6 @@ class UbusDeviceScanner(DeviceScanner):
self.password = config[CONF_PASSWORD]
self.parse_api_pattern = re.compile(r"(?P<param>\w*) = (?P<value>.*);")
self.lock = threading.Lock()
self.last_results = {}
self.url = 'http://{}/ubus'.format(host)
@ -89,34 +82,32 @@ class UbusDeviceScanner(DeviceScanner):
@_refresh_on_acccess_denied
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
with self.lock:
if self.leasefile is None:
result = _req_json_rpc(
self.url, self.session_id, 'call', 'uci', 'get',
config="dhcp", type="dnsmasq")
if result:
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
return
if self.leasefile is None:
result = _req_json_rpc(
self.url, self.session_id, 'call', 'uci', 'get',
config="dhcp", type="dnsmasq")
if result:
values = result["values"].values()
self.leasefile = next(iter(values))["leasefile"]
else:
return
if self.mac2name is None:
result = _req_json_rpc(
self.url, self.session_id, 'call', 'file', 'read',
path=self.leasefile)
if result:
self.mac2name = dict()
for line in result["data"].splitlines():
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the _req_json_rpc
return
if self.mac2name is None:
result = _req_json_rpc(
self.url, self.session_id, 'call', 'file', 'read',
path=self.leasefile)
if result:
self.mac2name = dict()
for line in result["data"].splitlines():
hosts = line.split(" ")
self.mac2name[hosts[1].upper()] = hosts[3]
else:
# Error, handled in the _req_json_rpc
return
return self.mac2name.get(device.upper(), None)
return self.mac2name.get(device.upper(), None)
@_refresh_on_acccess_denied
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the information from the Luci router is up to date.
@ -125,25 +116,24 @@ class UbusDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
_LOGGER.info("Checking ARP")
_LOGGER.info("Checking ARP")
if not self.hostapd:
hostapd = _req_json_rpc(
self.url, self.session_id, 'list', 'hostapd.*', '')
self.hostapd.extend(hostapd.keys())
if not self.hostapd:
hostapd = _req_json_rpc(
self.url, self.session_id, 'list', 'hostapd.*', '')
self.hostapd.extend(hostapd.keys())
self.last_results = []
results = 0
for hostapd in self.hostapd:
result = _req_json_rpc(
self.url, self.session_id, 'call', hostapd, 'get_clients')
self.last_results = []
results = 0
for hostapd in self.hostapd:
result = _req_json_rpc(
self.url, self.session_id, 'call', hostapd, 'get_clients')
if result:
results = results + 1
self.last_results.extend(result['clients'].keys())
if result:
results = results + 1
self.last_results.extend(result['clients'].keys())
return bool(results)
return bool(results)
def _req_json_rpc(url, session_id, rpcmethod, subsystem, method, **params):

View File

@ -9,8 +9,7 @@ import logging
from homeassistant.util import slugify
from homeassistant.helpers.dispatcher import (
dispatcher_connect, dispatcher_send)
from homeassistant.components.volvooncall import (
DATA_KEY, SIGNAL_VEHICLE_SEEN)
from homeassistant.components.volvooncall import DATA_KEY, SIGNAL_VEHICLE_SEEN
_LOGGER = logging.getLogger(__name__)

View File

@ -5,8 +5,6 @@ For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.xiaomi/
"""
import logging
import threading
from datetime import timedelta
import requests
import voluptuous as vol
@ -15,12 +13,9 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.util import Throttle
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=5)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME, default='admin'): cv.string,
@ -47,8 +42,6 @@ class XiaomiDeviceScanner(DeviceScanner):
self.username = config[CONF_USERNAME]
self.password = config[CONF_PASSWORD]
self.lock = threading.Lock()
self.last_results = {}
self.token = _get_token(self.host, self.username, self.password)
@ -62,21 +55,19 @@ class XiaomiDeviceScanner(DeviceScanner):
def get_device_name(self, device):
"""Return the name of the given device or None if we don't know."""
with self.lock:
if self.mac2name is None:
result = self._retrieve_list_with_retry()
if result:
hosts = [x for x in result
if 'mac' in x and 'name' in x]
mac2name_list = [
(x['mac'].upper(), x['name']) for x in hosts]
self.mac2name = dict(mac2name_list)
else:
# Error, handled in the _retrieve_list_with_retry
return
return self.mac2name.get(device.upper(), None)
if self.mac2name is None:
result = self._retrieve_list_with_retry()
if result:
hosts = [x for x in result
if 'mac' in x and 'name' in x]
mac2name_list = [
(x['mac'].upper(), x['name']) for x in hosts]
self.mac2name = dict(mac2name_list)
else:
# Error, handled in the _retrieve_list_with_retry
return
return self.mac2name.get(device.upper(), None)
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self):
"""Ensure the informations from the router are up to date.
@ -85,12 +76,11 @@ class XiaomiDeviceScanner(DeviceScanner):
if not self.success_init:
return False
with self.lock:
result = self._retrieve_list_with_retry()
if result:
self._store_result(result)
return True
return False
result = self._retrieve_list_with_retry()
if result:
self._store_result(result)
return True
return False
def _retrieve_list_with_retry(self):
"""Retrieve the device list with a retry if token is invalid.