From 601395bc12e0a9061079e1f08b1132a93c2353f5 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sat, 3 Sep 2016 10:38:17 -0600 Subject: [PATCH] Automatic ODB device tracker & device tracker attributes (#3035) --- .../components/device_tracker/__init__.py | 25 +- .../components/device_tracker/automatic.py | 161 +++++++++++ .../device_tracker/test_automatic.py | 254 ++++++++++++++++++ 3 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 homeassistant/components/device_tracker/automatic.py create mode 100644 tests/components/device_tracker/test_automatic.py diff --git a/homeassistant/components/device_tracker/__init__.py b/homeassistant/components/device_tracker/__init__.py index a4f65ab4ea4..4247213087b 100644 --- a/homeassistant/components/device_tracker/__init__.py +++ b/homeassistant/components/device_tracker/__init__.py @@ -62,6 +62,7 @@ ATTR_HOST_NAME = 'host_name' ATTR_LOCATION_NAME = 'location_name' ATTR_GPS = 'gps' ATTR_BATTERY = 'battery' +ATTR_ATTRIBUTES = 'attributes' PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend({ vol.Optional(CONF_SCAN_INTERVAL): cv.positive_int, # seconds @@ -86,10 +87,11 @@ def is_on(hass: HomeAssistantType, entity_id: str=None): return hass.states.is_state(entity, STATE_HOME) +# pylint: disable=too-many-arguments def see(hass: HomeAssistantType, mac: str=None, dev_id: str=None, host_name: str=None, location_name: str=None, gps: GPSType=None, gps_accuracy=None, - battery=None): # pylint: disable=too-many-arguments + battery=None, attributes: dict=None): """Call service to notify you see device.""" data = {key: value for key, value in ((ATTR_MAC, mac), @@ -99,6 +101,9 @@ def see(hass: HomeAssistantType, mac: str=None, dev_id: str=None, (ATTR_GPS, gps), (ATTR_GPS_ACCURACY, gps_accuracy), (ATTR_BATTERY, battery)) if value is not None} + if attributes: + for key, value in attributes: + data[key] = value hass.services.call(DOMAIN, SERVICE_SEE, data) @@ -164,7 +169,7 @@ def setup(hass: HomeAssistantType, config: ConfigType): """Service to see a device.""" args = {key: value for key, value in call.data.items() if key in (ATTR_MAC, ATTR_DEV_ID, ATTR_HOST_NAME, ATTR_LOCATION_NAME, - ATTR_GPS, ATTR_GPS_ACCURACY, ATTR_BATTERY)} + ATTR_GPS, ATTR_GPS_ACCURACY, ATTR_BATTERY, ATTR_ATTRIBUTES)} tracker.see(**args) descriptions = load_yaml_config_file( @@ -202,7 +207,7 @@ class DeviceTracker(object): def see(self, mac: str=None, dev_id: str=None, host_name: str=None, location_name: str=None, gps: GPSType=None, gps_accuracy=None, - battery: str=None): + battery: str=None, attributes: dict=None): """Notify the device tracker that you see a device.""" with self.lock: if mac is None and dev_id is None: @@ -218,7 +223,7 @@ class DeviceTracker(object): if device: device.seen(host_name, location_name, gps, gps_accuracy, - battery) + battery, attributes) if device.track: device.update_ha_state() return @@ -232,7 +237,8 @@ class DeviceTracker(object): if mac is not None: self.mac_to_dev[mac] = device - device.seen(host_name, location_name, gps, gps_accuracy, battery) + device.seen(host_name, location_name, gps, gps_accuracy, battery, + attributes) if device.track: device.update_ha_state() @@ -267,6 +273,7 @@ class Device(Entity): gps_accuracy = 0 last_seen = None # type: dt_util.dt.datetime battery = None # type: str + attributes = None # type: dict # Track if the last update of this device was HOME. last_update_home = False @@ -330,6 +337,10 @@ class Device(Entity): if self.battery: attr[ATTR_BATTERY] = self.battery + if self.attributes: + for key, value in self.attributes: + attr[key] = value + return attr @property @@ -338,13 +349,15 @@ class Device(Entity): return self.away_hide and self.state != STATE_HOME def seen(self, host_name: str=None, location_name: str=None, - gps: GPSType=None, gps_accuracy=0, battery: str=None): + gps: GPSType=None, gps_accuracy=0, battery: str=None, + attributes: dict=None): """Mark the device as seen.""" self.last_seen = dt_util.utcnow() self.host_name = host_name self.location_name = location_name self.gps_accuracy = gps_accuracy or 0 self.battery = battery + self.attributes = attributes self.gps = None if gps is not None: try: diff --git a/homeassistant/components/device_tracker/automatic.py b/homeassistant/components/device_tracker/automatic.py new file mode 100644 index 00000000000..927c515b3a5 --- /dev/null +++ b/homeassistant/components/device_tracker/automatic.py @@ -0,0 +1,161 @@ +""" +Support for the Automatic platform. + +For more details about this platform, please refer to the documentation at +https://home-assistant.io/components/device_tracker.automatic/ +""" +from datetime import timedelta +import logging +import re +import requests + +import voluptuous as vol + +from homeassistant.components.device_tracker import (PLATFORM_SCHEMA, + ATTR_ATTRIBUTES) +from homeassistant.const import CONF_USERNAME, CONF_PASSWORD +import homeassistant.helpers.config_validation as cv +from homeassistant.util import Throttle, datetime as dt_util + +_LOGGER = logging.getLogger(__name__) + +MIN_TIME_BETWEEN_SCANS = timedelta(seconds=30) + +CONF_CLIENT_ID = 'client_id' +CONF_SECRET = 'secret' +CONF_DEVICES = 'devices' + +SCOPE = 'scope:location scope:vehicle:profile scope:user:profile scope:trip' + +ATTR_ACCESS_TOKEN = 'access_token' +ATTR_EXPIRES_IN = 'expires_in' +ATTR_RESULTS = 'results' +ATTR_VEHICLE = 'vehicle' +ATTR_ENDED_AT = 'ended_at' +ATTR_END_LOCATION = 'end_location' + +URL_AUTHORIZE = 'https://accounts.automatic.com/oauth/access_token/' +URL_VEHICLES = 'https://api.automatic.com/vehicle/' +URL_TRIPS = 'https://api.automatic.com/trip/' + +_VEHICLE_ID_REGEX = re.compile( + (URL_VEHICLES + '(.*)?[/]$').replace('/', r'\/')) + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Required(CONF_CLIENT_ID): cv.string, + vol.Required(CONF_SECRET): cv.string, + vol.Required(CONF_USERNAME): cv.string, + vol.Required(CONF_PASSWORD): cv.string, + vol.Optional(CONF_DEVICES): vol.All(cv.ensure_list, [cv.string]) +}) + + +def setup_scanner(hass, config: dict, see): + """Validate the configuration and return an Automatic scanner.""" + try: + AutomaticDeviceScanner(config, see) + except requests.HTTPError as err: + _LOGGER.error(str(err)) + return False + + return True + + +class AutomaticDeviceScanner(object): + """A class representing an Automatic device.""" + + def __init__(self, config: dict, see) -> None: + """Initialize the automatic device scanner.""" + self._devices = config.get(CONF_DEVICES, None) + self._access_token_payload = { + 'username': config.get(CONF_USERNAME), + 'password': config.get(CONF_PASSWORD), + 'client_id': config.get(CONF_CLIENT_ID), + 'client_secret': config.get(CONF_SECRET), + 'grant_type': 'password', + 'scope': SCOPE + } + self._headers = None + self._token_expires = dt_util.now() + self.last_results = {} + self.last_trips = {} + self.see = see + + self.scan_devices() + + def scan_devices(self): + """Scan for new devices and return a list with found device IDs.""" + self._update_info() + + return [item['id'] for item in self.last_results] + + def get_device_name(self, device): + """Get the device name from id.""" + vehicle = [item['display_name'] for item in self.last_results + if item['id'] == device] + + return vehicle[0] + + def _update_headers(self): + """Get the access token from automatic.""" + if self._headers is None or self._token_expires <= dt_util.now(): + resp = requests.post( + URL_AUTHORIZE, + data=self._access_token_payload) + + resp.raise_for_status() + + json = resp.json() + + access_token = json[ATTR_ACCESS_TOKEN] + self._token_expires = dt_util.now() + timedelta( + seconds=json[ATTR_EXPIRES_IN]) + self._headers = { + 'Authorization': 'Bearer {}'.format(access_token) + } + + @Throttle(MIN_TIME_BETWEEN_SCANS) + def _update_info(self) -> None: + """Update the device info.""" + _LOGGER.info('Updating devices') + self._update_headers() + + response = requests.get(URL_VEHICLES, headers=self._headers) + + response.raise_for_status() + + self.last_results = [item for item in response.json()[ATTR_RESULTS] + if self._devices is None or item[ + 'display_name'] in self._devices] + + response = requests.get(URL_TRIPS, headers=self._headers) + + if response.status_code == 200: + for trip in response.json()[ATTR_RESULTS]: + vehicle_id = _VEHICLE_ID_REGEX.match( + trip[ATTR_VEHICLE]).group(1) + if vehicle_id not in self.last_trips: + self.last_trips[vehicle_id] = trip + elif self.last_trips[vehicle_id][ATTR_ENDED_AT] < trip[ + ATTR_ENDED_AT]: + self.last_trips[vehicle_id] = trip + + for vehicle in self.last_results: + dev_id = vehicle.get('id') + + attrs = { + 'fuel_level': vehicle.get('fuel_level_percent') + } + + kwargs = { + 'dev_id': dev_id, + 'mac': dev_id, + ATTR_ATTRIBUTES: attrs + } + + if dev_id in self.last_trips: + end_location = self.last_trips[dev_id][ATTR_END_LOCATION] + kwargs['gps'] = (end_location['lat'], end_location['lon']) + kwargs['gps_accuracy'] = end_location['accuracy_m'] + + self.see(**kwargs) diff --git a/tests/components/device_tracker/test_automatic.py b/tests/components/device_tracker/test_automatic.py new file mode 100644 index 00000000000..e026d91a43c --- /dev/null +++ b/tests/components/device_tracker/test_automatic.py @@ -0,0 +1,254 @@ +"""Test the automatic device tracker platform.""" + +import logging +import requests +import unittest +from unittest.mock import patch + +from homeassistant.components.device_tracker.automatic import ( + URL_AUTHORIZE, URL_VEHICLES, URL_TRIPS, setup_scanner, + AutomaticDeviceScanner) + +_LOGGER = logging.getLogger(__name__) + +INVALID_USERNAME = 'bob' +VALID_USERNAME = 'jim' +PASSWORD = 'password' +CLIENT_ID = '12345' +CLIENT_SECRET = '54321' +FUEL_LEVEL = 77.2 +LATITUDE = 32.82336 +LONGITUDE = -117.23743 +ACCURACY = 8 +DISPLAY_NAME = 'My Vehicle' + + +def mocked_requests(*args, **kwargs): + """Mock requests.get invocations.""" + class MockResponse: + """Class to represent a mocked response.""" + + def __init__(self, json_data, status_code): + """Initialize the mock response class.""" + self.json_data = json_data + self.status_code = status_code + + def json(self): + """Return the json of the response.""" + return self.json_data + + @property + def content(self): + """Return the content of the response.""" + return self.json() + + def raise_for_status(self): + """Raise an HTTPError if status is not 200.""" + if self.status_code != 200: + raise requests.HTTPError(self.status_code) + + data = kwargs.get('data') + + if data and data.get('username', None) == INVALID_USERNAME: + return MockResponse({ + "error": "invalid_credentials" + }, 401) + elif str(args[0]).startswith(URL_AUTHORIZE): + return MockResponse({ + "user": { + "sid": "sid", + "id": "id" + }, + "token_type": "Bearer", + "access_token": "accesstoken", + "refresh_token": "refreshtoken", + "expires_in": 31521669, + "scope": "" + }, 200) + elif str(args[0]).startswith(URL_VEHICLES): + return MockResponse({ + "_metadata": { + "count": 2, + "next": None, + "previous": None + }, + "results": [ + { + "url": "https://api.automatic.com/vehicle/vid/", + "id": "vid", + "created_at": "2016-03-05T20:05:16.240000Z", + "updated_at": "2016-08-29T01:52:59.597898Z", + "make": "Honda", + "model": "Element", + "year": 2007, + "submodel": "EX", + "display_name": DISPLAY_NAME, + "fuel_grade": "regular", + "fuel_level_percent": FUEL_LEVEL, + "active_dtcs": [] + }] + }, 200) + elif str(args[0]).startswith(URL_TRIPS): + return MockResponse({ + "_metadata": { + "count": 1594, + "next": "https://api.automatic.com/trip/?page=2", + "previous": None + }, + "results": [ + { + "url": "https://api.automatic.com/trip/tid1/", + "id": "tid1", + "driver": "https://api.automatic.com/user/uid/", + "user": "https://api.automatic.com/user/uid/", + "started_at": "2016-08-28T19:37:23.986000Z", + "ended_at": "2016-08-28T19:43:22.500000Z", + "distance_m": 3931.6, + "duration_s": 358.5, + "vehicle": "https://api.automatic.com/vehicle/vid/", + "start_location": { + "lat": 32.87336, + "lon": -117.22743, + "accuracy_m": 10 + }, + "start_address": { + "name": "123 Fake St, Nowhere, NV 12345", + "display_name": "123 Fake St, Nowhere, NV", + "street_number": "Unknown", + "street_name": "Fake St", + "city": "Nowhere", + "state": "NV", + "country": "US" + }, + "end_location": { + "lat": LATITUDE, + "lon": LONGITUDE, + "accuracy_m": ACCURACY + }, + "end_address": { + "name": "321 Fake St, Nowhere, NV 12345", + "display_name": "321 Fake St, Nowhere, NV", + "street_number": "Unknown", + "street_name": "Fake St", + "city": "Nowhere", + "state": "NV", + "country": "US" + }, + "path": "path", + "vehicle_events": [], + "start_timezone": "America/Denver", + "end_timezone": "America/Denver", + "idling_time_s": 0, + "tags": [] + }, + { + "url": "https://api.automatic.com/trip/tid2/", + "id": "tid2", + "driver": "https://api.automatic.com/user/uid/", + "user": "https://api.automatic.com/user/uid/", + "started_at": "2016-08-28T18:48:00.727000Z", + "ended_at": "2016-08-28T18:55:25.800000Z", + "distance_m": 3969.1, + "duration_s": 445.1, + "vehicle": "https://api.automatic.com/vehicle/vid/", + "start_location": { + "lat": 32.87336, + "lon": -117.22743, + "accuracy_m": 11 + }, + "start_address": { + "name": "123 Fake St, Nowhere, NV, USA", + "display_name": "Fake St, Nowhere, NV", + "street_number": "123", + "street_name": "Fake St", + "city": "Nowhere", + "state": "NV", + "country": "US" + }, + "end_location": { + "lat": 32.82336, + "lon": -117.23743, + "accuracy_m": 10 + }, + "end_address": { + "name": "321 Fake St, Nowhere, NV, USA", + "display_name": "Fake St, Nowhere, NV", + "street_number": "Unknown", + "street_name": "Fake St", + "city": "Nowhere", + "state": "NV", + "country": "US" + }, + "path": "path", + "vehicle_events": [], + "start_timezone": "America/Denver", + "end_timezone": "America/Denver", + "idling_time_s": 0, + "tags": [] + } + ] + }, 200) + else: + _LOGGER.debug('UNKNOWN ROUTE') + + +class TestAutomatic(unittest.TestCase): + """Test cases around the automatic device scanner.""" + + def see_mock(self, **kwargs): + """Mock see function.""" + self.assertEqual('vid', kwargs.get('dev_id')) + self.assertEqual(FUEL_LEVEL, + kwargs.get('attributes', {}).get('fuel_level')) + self.assertEqual((LATITUDE, LONGITUDE), kwargs.get('gps')) + self.assertEqual(ACCURACY, kwargs.get('gps_accuracy')) + + def setUp(self): + """Set up test data.""" + + def tearDown(self): + """Tear down test data.""" + + @patch('requests.get', side_effect=mocked_requests) + @patch('requests.post', side_effect=mocked_requests) + def test_invalid_credentials(self, mock_get, mock_post): + """Test error is raised with invalid credentials.""" + config = { + 'platform': 'automatic', + 'username': INVALID_USERNAME, + 'password': PASSWORD, + 'client_id': CLIENT_ID, + 'secret': CLIENT_SECRET + } + + self.assertFalse(setup_scanner(None, config, self.see_mock)) + + @patch('requests.get', side_effect=mocked_requests) + @patch('requests.post', side_effect=mocked_requests) + def test_valid_credentials(self, mock_get, mock_post): + """Test error is raised with invalid credentials.""" + config = { + 'platform': 'automatic', + 'username': VALID_USERNAME, + 'password': PASSWORD, + 'client_id': CLIENT_ID, + 'secret': CLIENT_SECRET + } + + self.assertTrue(setup_scanner(None, config, self.see_mock)) + + @patch('requests.get', side_effect=mocked_requests) + @patch('requests.post', side_effect=mocked_requests) + def test_device_attributes(self, mock_get, mock_post): + """Test device attributes are set on load.""" + config = { + 'platform': 'automatic', + 'username': VALID_USERNAME, + 'password': PASSWORD, + 'client_id': CLIENT_ID, + 'secret': CLIENT_SECRET + } + + scanner = AutomaticDeviceScanner(config, self.see_mock) + + self.assertEqual(DISPLAY_NAME, scanner.get_device_name('vid'))