diff --git a/homeassistant/components/tplink/device_tracker.py b/homeassistant/components/tplink/device_tracker.py deleted file mode 100644 index ce17f6e465f..00000000000 --- a/homeassistant/components/tplink/device_tracker.py +++ /dev/null @@ -1,508 +0,0 @@ -"""Support for TP-Link routers.""" -import base64 -from datetime import datetime -import hashlib -import logging -import re - -from aiohttp.hdrs import ( - ACCEPT, - ACCEPT_ENCODING, - ACCEPT_LANGUAGE, - CACHE_CONTROL, - CONNECTION, - CONTENT_TYPE, - COOKIE, - KEEP_ALIVE, - PRAGMA, - REFERER, - USER_AGENT, -) -import requests -from tplink.tplink import TpLinkClient -import voluptuous as vol - -from homeassistant.components.device_tracker import ( - DOMAIN, - PLATFORM_SCHEMA, - DeviceScanner, -) -from homeassistant.const import ( - CONF_HOST, - CONF_PASSWORD, - CONF_USERNAME, - HTTP_HEADER_X_REQUESTED_WITH, -) -import homeassistant.helpers.config_validation as cv - -_LOGGER = logging.getLogger(__name__) - -HTTP_HEADER_NO_CACHE = "no-cache" - -PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( - { - vol.Required(CONF_HOST): cv.string, - vol.Required(CONF_PASSWORD): cv.string, - vol.Required(CONF_USERNAME): cv.string, - } -) - - -def get_scanner(hass, config): - """ - Validate the configuration and return a TP-Link scanner. - - The default way of integrating devices is to use a pypi - - package, The TplinkDeviceScanner has been refactored - - to depend on a pypi package, the other implementations - - should be gradually migrated in the pypi package - - """ - _LOGGER.warning( - "TP-Link device tracker is unmaintained and will be " - "removed in the future releases if no maintainer is " - "found. If you have interest in this integration, " - "feel free to create a pull request to move this code " - "to a new 'tplink_router' integration and refactoring " - "the device-specific parts to the tplink library" - ) - for cls in [ - TplinkDeviceScanner, - Tplink5DeviceScanner, - Tplink4DeviceScanner, - Tplink3DeviceScanner, - Tplink2DeviceScanner, - Tplink1DeviceScanner, - ]: - scanner = cls(config[DOMAIN]) - if scanner.success_init: - return scanner - - return None - - -class TplinkDeviceScanner(DeviceScanner): - """Queries the router for connected devices.""" - - def __init__(self, config): - """Initialize the scanner.""" - - host = config[CONF_HOST] - password = config[CONF_PASSWORD] - username = config[CONF_USERNAME] - - self.success_init = False - try: - self.tplink_client = TpLinkClient(password, host=host, username=username) - - self.last_results = {} - - self.success_init = self._update_info() - except requests.exceptions.RequestException: - _LOGGER.debug("RequestException in %s", self.__class__.__name__) - - def scan_devices(self): - """Scan for new devices and return a list with found device IDs.""" - self._update_info() - return self.last_results.keys() - - def get_device_name(self, device): - """Get the name of the device.""" - return self.last_results.get(device) - - def _update_info(self): - """Ensure the information from the TP-Link router is up to date. - - Return boolean if scanning successful. - """ - _LOGGER.info("Loading wireless clients...") - result = self.tplink_client.get_connected_devices() - - if result: - self.last_results = result - return True - - return False - - -class Tplink1DeviceScanner(DeviceScanner): - """This class queries a wireless router running TP-Link firmware.""" - - def __init__(self, config): - """Initialize the scanner.""" - host = config[CONF_HOST] - username, password = config[CONF_USERNAME], config[CONF_PASSWORD] - - self.parse_macs = re.compile( - "[0-9A-F]{2}-[0-9A-F]{2}-[0-9A-F]{2}-" - + "[0-9A-F]{2}-[0-9A-F]{2}-[0-9A-F]{2}" - ) - - self.host = host - self.username = username - self.password = password - - self.last_results = {} - self.success_init = False - try: - self.success_init = self._update_info() - except requests.exceptions.RequestException: - _LOGGER.debug("RequestException in %s", self.__class__.__name__) - - def scan_devices(self): - """Scan for new devices and return a list with found device IDs.""" - self._update_info() - return self.last_results - - def get_device_name(self, device): - """Get firmware doesn't save the name of the wireless device.""" - return None - - def _update_info(self): - """Ensure the information from the TP-Link router is up to date. - - Return boolean if scanning successful. - """ - _LOGGER.info("Loading wireless clients...") - - url = f"http://{self.host}/userRpm/WlanStationRpm.htm" - referer = f"http://{self.host}" - page = requests.get( - url, - auth=(self.username, self.password), - headers={REFERER: referer}, - timeout=4, - ) - - result = self.parse_macs.findall(page.text) - - if result: - self.last_results = [mac.replace("-", ":") for mac in result] - return True - - return False - - -class Tplink2DeviceScanner(Tplink1DeviceScanner): - """This class queries a router with newer version of TP-Link firmware.""" - - def scan_devices(self): - """Scan for new devices and return a list with found device IDs.""" - self._update_info() - return self.last_results.keys() - - def get_device_name(self, device): - """Get firmware doesn't save the name of the wireless device.""" - return self.last_results.get(device) - - def _update_info(self): - """Ensure the information from the TP-Link router is up to date. - - Return boolean if scanning successful. - """ - _LOGGER.info("Loading wireless clients...") - - url = f"http://{self.host}/data/map_access_wireless_client_grid.json" - referer = f"http://{self.host}" - - # Router uses Authorization cookie instead of header - # Let's create the cookie - username_password = f"{self.username}:{self.password}" - b64_encoded_username_password = base64.b64encode( - username_password.encode("ascii") - ).decode("ascii") - cookie = f"Authorization=Basic {b64_encoded_username_password}" - - response = requests.post( - url, headers={REFERER: referer, COOKIE: cookie}, timeout=4 - ) - - try: - result = response.json().get("data") - except ValueError: - _LOGGER.error( - "Router didn't respond with JSON. " "Check if credentials are correct." - ) - return False - - if result: - self.last_results = { - device["mac_addr"].replace("-", ":"): device["name"] - for device in result - } - return True - - return False - - -class Tplink3DeviceScanner(Tplink1DeviceScanner): - """This class queries the Archer C9 router with version 150811 or high.""" - - def __init__(self, config): - """Initialize the scanner.""" - self.stok = "" - self.sysauth = "" - super().__init__(config) - - def scan_devices(self): - """Scan for new devices and return a list with found device IDs.""" - self._update_info() - self._log_out() - return self.last_results.keys() - - def get_device_name(self, device): - """Get the firmware doesn't save the name of the wireless device. - - We are forced to use the MAC address as name here. - """ - return self.last_results.get(device) - - def _get_auth_tokens(self): - """Retrieve auth tokens from the router.""" - _LOGGER.info("Retrieving auth tokens...") - - url = f"http://{self.host}/cgi-bin/luci/;stok=/login?form=login" - referer = f"http://{self.host}/webpages/login.html" - - # If possible implement RSA encryption of password here. - response = requests.post( - url, - params={ - "operation": "login", - "username": self.username, - "password": self.password, - }, - headers={REFERER: referer}, - timeout=4, - ) - - try: - self.stok = response.json().get("data").get("stok") - _LOGGER.info(self.stok) - regex_result = re.search("sysauth=(.*);", response.headers["set-cookie"]) - self.sysauth = regex_result.group(1) - _LOGGER.info(self.sysauth) - return True - except (ValueError, KeyError): - _LOGGER.error("Couldn't fetch auth tokens! Response was: %s", response.text) - return False - - def _update_info(self): - """Ensure the information from the TP-Link router is up to date. - - Return boolean if scanning successful. - """ - if (self.stok == "") or (self.sysauth == ""): - self._get_auth_tokens() - - _LOGGER.info("Loading wireless clients...") - - url = ( - "http://{}/cgi-bin/luci/;stok={}/admin/wireless?" "form=statistics" - ).format(self.host, self.stok) - referer = f"http://{self.host}/webpages/index.html" - - response = requests.post( - url, - params={"operation": "load"}, - headers={REFERER: referer}, - cookies={"sysauth": self.sysauth}, - timeout=5, - ) - - try: - json_response = response.json() - - if json_response.get("success"): - result = response.json().get("data") - else: - if json_response.get("errorcode") == "timeout": - _LOGGER.info("Token timed out. Relogging on next scan") - self.stok = "" - self.sysauth = "" - return False - _LOGGER.error("An unknown error happened while fetching data") - return False - except ValueError: - _LOGGER.error( - "Router didn't respond with JSON. " "Check if credentials are correct" - ) - return False - - if result: - self.last_results = { - device["mac"].replace("-", ":"): device["mac"] for device in result - } - return True - - return False - - def _log_out(self): - _LOGGER.info("Logging out of router admin interface...") - - url = ("http://{}/cgi-bin/luci/;stok={}/admin/system?" "form=logout").format( - self.host, self.stok - ) - referer = f"http://{self.host}/webpages/index.html" - - requests.post( - url, - params={"operation": "write"}, - headers={REFERER: referer}, - cookies={"sysauth": self.sysauth}, - ) - self.stok = "" - self.sysauth = "" - - -class Tplink4DeviceScanner(Tplink1DeviceScanner): - """This class queries an Archer C7 router with TP-Link firmware 150427.""" - - def __init__(self, config): - """Initialize the scanner.""" - self.credentials = "" - self.token = "" - super().__init__(config) - - def scan_devices(self): - """Scan for new devices and return a list with found device IDs.""" - self._update_info() - return self.last_results - - def get_device_name(self, device): - """Get the name of the wireless device.""" - return None - - def _get_auth_tokens(self): - """Retrieve auth tokens from the router.""" - _LOGGER.info("Retrieving auth tokens...") - url = f"http://{self.host}/userRpm/LoginRpm.htm?Save=Save" - - # Generate md5 hash of password. The C7 appears to use the first 15 - # characters of the password only, so we truncate to remove additional - # characters from being hashed. - password = hashlib.md5(self.password.encode("utf")[:15]).hexdigest() - credentials = f"{self.username}:{password}".encode("utf") - - # Encode the credentials to be sent as a cookie. - self.credentials = base64.b64encode(credentials).decode("utf") - - # Create the authorization cookie. - cookie = f"Authorization=Basic {self.credentials}" - - response = requests.get(url, headers={COOKIE: cookie}) - - try: - result = re.search( - r"window.parent.location.href = " - r'"https?:\/\/.*\/(.*)\/userRpm\/Index.htm";', - response.text, - ) - if not result: - return False - self.token = result.group(1) - return True - except ValueError: - _LOGGER.error("Couldn't fetch auth tokens") - return False - - def _update_info(self): - """Ensure the information from the TP-Link router is up to date. - - Return boolean if scanning successful. - """ - if (self.credentials == "") or (self.token == ""): - self._get_auth_tokens() - - _LOGGER.info("Loading wireless clients...") - - mac_results = [] - - # Check both the 2.4GHz and 5GHz client list URLs - for clients_url in ("WlanStationRpm.htm", "WlanStationRpm_5g.htm"): - url = f"http://{self.host}/{self.token}/userRpm/{clients_url}" - referer = f"http://{self.host}" - cookie = f"Authorization=Basic {self.credentials}" - - page = requests.get(url, headers={COOKIE: cookie, REFERER: referer}) - mac_results.extend(self.parse_macs.findall(page.text)) - - if not mac_results: - return False - - self.last_results = [mac.replace("-", ":") for mac in mac_results] - return True - - -class Tplink5DeviceScanner(Tplink1DeviceScanner): - """This class queries a TP-Link EAP-225 AP with newer TP-Link FW.""" - - def scan_devices(self): - """Scan for new devices and return a list with found MAC IDs.""" - self._update_info() - return self.last_results.keys() - - def get_device_name(self, device): - """Get firmware doesn't save the name of the wireless device.""" - return None - - def _update_info(self): - """Ensure the information from the TP-Link AP is up to date. - - Return boolean if scanning successful. - """ - _LOGGER.info("Loading wireless clients...") - - base_url = f"http://{self.host}" - - header = { - USER_AGENT: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12;" - " rv:53.0) Gecko/20100101 Firefox/53.0", - ACCEPT: "application/json, text/javascript, */*; q=0.01", - ACCEPT_LANGUAGE: "Accept-Language: en-US,en;q=0.5", - ACCEPT_ENCODING: "gzip, deflate", - CONTENT_TYPE: "application/x-www-form-urlencoded; charset=UTF-8", - HTTP_HEADER_X_REQUESTED_WITH: "XMLHttpRequest", - REFERER: f"http://{self.host}/", - CONNECTION: KEEP_ALIVE, - PRAGMA: HTTP_HEADER_NO_CACHE, - CACHE_CONTROL: HTTP_HEADER_NO_CACHE, - } - - password_md5 = hashlib.md5(self.password.encode("utf")).hexdigest().upper() - - # Create a session to handle cookie easier - session = requests.session() - session.get(base_url, headers=header) - - login_data = {"username": self.username, "password": password_md5} - session.post(base_url, login_data, headers=header) - - # A timestamp is required to be sent as get parameter - timestamp = int(datetime.now().timestamp() * 1e3) - - client_list_url = f"{base_url}/data/monitor.client.client.json" - - get_params = {"operation": "load", "_": timestamp} - - response = session.get(client_list_url, headers=header, params=get_params) - session.close() - try: - list_of_devices = response.json() - except ValueError: - _LOGGER.error( - "AP didn't respond with JSON. " "Check if credentials are correct" - ) - return False - - if list_of_devices: - self.last_results = { - device["MAC"].replace("-", ":"): device["DeviceName"] - for device in list_of_devices["data"] - } - return True - - return False diff --git a/homeassistant/components/tplink/manifest.json b/homeassistant/components/tplink/manifest.json index f299e02e2d3..c2a2197c844 100644 --- a/homeassistant/components/tplink/manifest.json +++ b/homeassistant/components/tplink/manifest.json @@ -4,8 +4,7 @@ "config_flow": true, "documentation": "https://www.home-assistant.io/integrations/tplink", "requirements": [ - "pyHS100==0.3.5", - "tplink==0.2.1" + "pyHS100==0.3.5" ], "dependencies": [], "codeowners": [ diff --git a/requirements_all.txt b/requirements_all.txt index ad17b59b455..c79e36413a9 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1899,9 +1899,6 @@ total_connect_client==0.28 # homeassistant.components.tplink_lte tp-connected==0.0.4 -# homeassistant.components.tplink -tplink==0.2.1 - # homeassistant.components.transmission transmissionrpc==0.11 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 9b59a1d806b..7912de963ab 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -586,9 +586,6 @@ tellduslive==0.10.10 # homeassistant.components.toon toonapilib==3.2.4 -# homeassistant.components.tplink -tplink==0.2.1 - # homeassistant.components.transmission transmissionrpc==0.11 diff --git a/tests/components/tplink/test_device_tracker.py b/tests/components/tplink/test_device_tracker.py deleted file mode 100644 index bbe73dc121a..00000000000 --- a/tests/components/tplink/test_device_tracker.py +++ /dev/null @@ -1,71 +0,0 @@ -"""The tests for the tplink device tracker platform.""" - -import os -import pytest - -from homeassistant.components.device_tracker.legacy import YAML_DEVICES -from homeassistant.components.tplink.device_tracker import Tplink4DeviceScanner -from homeassistant.const import CONF_PLATFORM, CONF_PASSWORD, CONF_USERNAME, CONF_HOST -import requests_mock - - -@pytest.fixture(autouse=True) -def setup_comp(hass): - """Initialize components.""" - yaml_devices = hass.config.path(YAML_DEVICES) - yield - if os.path.isfile(yaml_devices): - os.remove(yaml_devices) - - -async def test_get_mac_addresses_from_both_bands(hass): - """Test grabbing the mac addresses from 2.4 and 5 GHz clients pages.""" - with requests_mock.Mocker() as m: - conf_dict = { - CONF_PLATFORM: "tplink", - CONF_HOST: "fake-host", - CONF_USERNAME: "fake_user", - CONF_PASSWORD: "fake_pass", - } - - # Mock the token retrieval process - FAKE_TOKEN = "fake_token" - fake_auth_token_response = ( - "window.parent.location.href = " - '"https://a/{}/userRpm/Index.htm";'.format(FAKE_TOKEN) - ) - - m.get( - "http://{}/userRpm/LoginRpm.htm?Save=Save".format(conf_dict[CONF_HOST]), - text=fake_auth_token_response, - ) - - FAKE_MAC_1 = "CA-FC-8A-C8-BB-53" - FAKE_MAC_2 = "6C-48-83-21-46-8D" - FAKE_MAC_3 = "77-98-75-65-B1-2B" - mac_response_2_4 = "{} {}".format(FAKE_MAC_1, FAKE_MAC_2) - mac_response_5 = "{}".format(FAKE_MAC_3) - - # Mock the 2.4 GHz clients page - m.get( - "http://{}/{}/userRpm/WlanStationRpm.htm".format( - conf_dict[CONF_HOST], FAKE_TOKEN - ), - text=mac_response_2_4, - ) - - # Mock the 5 GHz clients page - m.get( - "http://{}/{}/userRpm/WlanStationRpm_5g.htm".format( - conf_dict[CONF_HOST], FAKE_TOKEN - ), - text=mac_response_5, - ) - - tplink = Tplink4DeviceScanner(conf_dict) - - expected_mac_results = [ - mac.replace("-", ":") for mac in [FAKE_MAC_1, FAKE_MAC_2, FAKE_MAC_3] - ] - - assert tplink.last_results == expected_mac_results