Automatic ODB device tracker & device tracker attributes (#3035)

This commit is contained in:
Teagan Glenn 2016-09-03 10:38:17 -06:00 committed by Johann Kellerman
parent a08ac85971
commit 601395bc12
3 changed files with 434 additions and 6 deletions

View File

@ -62,6 +62,7 @@ ATTR_HOST_NAME = 'host_name'
ATTR_LOCATION_NAME = 'location_name'
ATTR_GPS = 'gps'
ATTR_BATTERY = 'battery'
ATTR_ATTRIBUTES = 'attributes'
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend({
vol.Optional(CONF_SCAN_INTERVAL): cv.positive_int, # seconds
@ -86,10 +87,11 @@ def is_on(hass: HomeAssistantType, entity_id: str=None):
return hass.states.is_state(entity, STATE_HOME)
# pylint: disable=too-many-arguments
def see(hass: HomeAssistantType, mac: str=None, dev_id: str=None,
host_name: str=None, location_name: str=None,
gps: GPSType=None, gps_accuracy=None,
battery=None): # pylint: disable=too-many-arguments
battery=None, attributes: dict=None):
"""Call service to notify you see device."""
data = {key: value for key, value in
((ATTR_MAC, mac),
@ -99,6 +101,9 @@ def see(hass: HomeAssistantType, mac: str=None, dev_id: str=None,
(ATTR_GPS, gps),
(ATTR_GPS_ACCURACY, gps_accuracy),
(ATTR_BATTERY, battery)) if value is not None}
if attributes:
for key, value in attributes:
data[key] = value
hass.services.call(DOMAIN, SERVICE_SEE, data)
@ -164,7 +169,7 @@ def setup(hass: HomeAssistantType, config: ConfigType):
"""Service to see a device."""
args = {key: value for key, value in call.data.items() if key in
(ATTR_MAC, ATTR_DEV_ID, ATTR_HOST_NAME, ATTR_LOCATION_NAME,
ATTR_GPS, ATTR_GPS_ACCURACY, ATTR_BATTERY)}
ATTR_GPS, ATTR_GPS_ACCURACY, ATTR_BATTERY, ATTR_ATTRIBUTES)}
tracker.see(**args)
descriptions = load_yaml_config_file(
@ -202,7 +207,7 @@ class DeviceTracker(object):
def see(self, mac: str=None, dev_id: str=None, host_name: str=None,
location_name: str=None, gps: GPSType=None, gps_accuracy=None,
battery: str=None):
battery: str=None, attributes: dict=None):
"""Notify the device tracker that you see a device."""
with self.lock:
if mac is None and dev_id is None:
@ -218,7 +223,7 @@ class DeviceTracker(object):
if device:
device.seen(host_name, location_name, gps, gps_accuracy,
battery)
battery, attributes)
if device.track:
device.update_ha_state()
return
@ -232,7 +237,8 @@ class DeviceTracker(object):
if mac is not None:
self.mac_to_dev[mac] = device
device.seen(host_name, location_name, gps, gps_accuracy, battery)
device.seen(host_name, location_name, gps, gps_accuracy, battery,
attributes)
if device.track:
device.update_ha_state()
@ -267,6 +273,7 @@ class Device(Entity):
gps_accuracy = 0
last_seen = None # type: dt_util.dt.datetime
battery = None # type: str
attributes = None # type: dict
# Track if the last update of this device was HOME.
last_update_home = False
@ -330,6 +337,10 @@ class Device(Entity):
if self.battery:
attr[ATTR_BATTERY] = self.battery
if self.attributes:
for key, value in self.attributes:
attr[key] = value
return attr
@property
@ -338,13 +349,15 @@ class Device(Entity):
return self.away_hide and self.state != STATE_HOME
def seen(self, host_name: str=None, location_name: str=None,
gps: GPSType=None, gps_accuracy=0, battery: str=None):
gps: GPSType=None, gps_accuracy=0, battery: str=None,
attributes: dict=None):
"""Mark the device as seen."""
self.last_seen = dt_util.utcnow()
self.host_name = host_name
self.location_name = location_name
self.gps_accuracy = gps_accuracy or 0
self.battery = battery
self.attributes = attributes
self.gps = None
if gps is not None:
try:

View File

@ -0,0 +1,161 @@
"""
Support for the Automatic platform.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/device_tracker.automatic/
"""
from datetime import timedelta
import logging
import re
import requests
import voluptuous as vol
from homeassistant.components.device_tracker import (PLATFORM_SCHEMA,
ATTR_ATTRIBUTES)
from homeassistant.const import CONF_USERNAME, CONF_PASSWORD
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle, datetime as dt_util
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=30)
CONF_CLIENT_ID = 'client_id'
CONF_SECRET = 'secret'
CONF_DEVICES = 'devices'
SCOPE = 'scope:location scope:vehicle:profile scope:user:profile scope:trip'
ATTR_ACCESS_TOKEN = 'access_token'
ATTR_EXPIRES_IN = 'expires_in'
ATTR_RESULTS = 'results'
ATTR_VEHICLE = 'vehicle'
ATTR_ENDED_AT = 'ended_at'
ATTR_END_LOCATION = 'end_location'
URL_AUTHORIZE = 'https://accounts.automatic.com/oauth/access_token/'
URL_VEHICLES = 'https://api.automatic.com/vehicle/'
URL_TRIPS = 'https://api.automatic.com/trip/'
_VEHICLE_ID_REGEX = re.compile(
(URL_VEHICLES + '(.*)?[/]$').replace('/', r'\/'))
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_CLIENT_ID): cv.string,
vol.Required(CONF_SECRET): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_DEVICES): vol.All(cv.ensure_list, [cv.string])
})
def setup_scanner(hass, config: dict, see):
"""Validate the configuration and return an Automatic scanner."""
try:
AutomaticDeviceScanner(config, see)
except requests.HTTPError as err:
_LOGGER.error(str(err))
return False
return True
class AutomaticDeviceScanner(object):
"""A class representing an Automatic device."""
def __init__(self, config: dict, see) -> None:
"""Initialize the automatic device scanner."""
self._devices = config.get(CONF_DEVICES, None)
self._access_token_payload = {
'username': config.get(CONF_USERNAME),
'password': config.get(CONF_PASSWORD),
'client_id': config.get(CONF_CLIENT_ID),
'client_secret': config.get(CONF_SECRET),
'grant_type': 'password',
'scope': SCOPE
}
self._headers = None
self._token_expires = dt_util.now()
self.last_results = {}
self.last_trips = {}
self.see = see
self.scan_devices()
def scan_devices(self):
"""Scan for new devices and return a list with found device IDs."""
self._update_info()
return [item['id'] for item in self.last_results]
def get_device_name(self, device):
"""Get the device name from id."""
vehicle = [item['display_name'] for item in self.last_results
if item['id'] == device]
return vehicle[0]
def _update_headers(self):
"""Get the access token from automatic."""
if self._headers is None or self._token_expires <= dt_util.now():
resp = requests.post(
URL_AUTHORIZE,
data=self._access_token_payload)
resp.raise_for_status()
json = resp.json()
access_token = json[ATTR_ACCESS_TOKEN]
self._token_expires = dt_util.now() + timedelta(
seconds=json[ATTR_EXPIRES_IN])
self._headers = {
'Authorization': 'Bearer {}'.format(access_token)
}
@Throttle(MIN_TIME_BETWEEN_SCANS)
def _update_info(self) -> None:
"""Update the device info."""
_LOGGER.info('Updating devices')
self._update_headers()
response = requests.get(URL_VEHICLES, headers=self._headers)
response.raise_for_status()
self.last_results = [item for item in response.json()[ATTR_RESULTS]
if self._devices is None or item[
'display_name'] in self._devices]
response = requests.get(URL_TRIPS, headers=self._headers)
if response.status_code == 200:
for trip in response.json()[ATTR_RESULTS]:
vehicle_id = _VEHICLE_ID_REGEX.match(
trip[ATTR_VEHICLE]).group(1)
if vehicle_id not in self.last_trips:
self.last_trips[vehicle_id] = trip
elif self.last_trips[vehicle_id][ATTR_ENDED_AT] < trip[
ATTR_ENDED_AT]:
self.last_trips[vehicle_id] = trip
for vehicle in self.last_results:
dev_id = vehicle.get('id')
attrs = {
'fuel_level': vehicle.get('fuel_level_percent')
}
kwargs = {
'dev_id': dev_id,
'mac': dev_id,
ATTR_ATTRIBUTES: attrs
}
if dev_id in self.last_trips:
end_location = self.last_trips[dev_id][ATTR_END_LOCATION]
kwargs['gps'] = (end_location['lat'], end_location['lon'])
kwargs['gps_accuracy'] = end_location['accuracy_m']
self.see(**kwargs)

View File

@ -0,0 +1,254 @@
"""Test the automatic device tracker platform."""
import logging
import requests
import unittest
from unittest.mock import patch
from homeassistant.components.device_tracker.automatic import (
URL_AUTHORIZE, URL_VEHICLES, URL_TRIPS, setup_scanner,
AutomaticDeviceScanner)
_LOGGER = logging.getLogger(__name__)
INVALID_USERNAME = 'bob'
VALID_USERNAME = 'jim'
PASSWORD = 'password'
CLIENT_ID = '12345'
CLIENT_SECRET = '54321'
FUEL_LEVEL = 77.2
LATITUDE = 32.82336
LONGITUDE = -117.23743
ACCURACY = 8
DISPLAY_NAME = 'My Vehicle'
def mocked_requests(*args, **kwargs):
"""Mock requests.get invocations."""
class MockResponse:
"""Class to represent a mocked response."""
def __init__(self, json_data, status_code):
"""Initialize the mock response class."""
self.json_data = json_data
self.status_code = status_code
def json(self):
"""Return the json of the response."""
return self.json_data
@property
def content(self):
"""Return the content of the response."""
return self.json()
def raise_for_status(self):
"""Raise an HTTPError if status is not 200."""
if self.status_code != 200:
raise requests.HTTPError(self.status_code)
data = kwargs.get('data')
if data and data.get('username', None) == INVALID_USERNAME:
return MockResponse({
"error": "invalid_credentials"
}, 401)
elif str(args[0]).startswith(URL_AUTHORIZE):
return MockResponse({
"user": {
"sid": "sid",
"id": "id"
},
"token_type": "Bearer",
"access_token": "accesstoken",
"refresh_token": "refreshtoken",
"expires_in": 31521669,
"scope": ""
}, 200)
elif str(args[0]).startswith(URL_VEHICLES):
return MockResponse({
"_metadata": {
"count": 2,
"next": None,
"previous": None
},
"results": [
{
"url": "https://api.automatic.com/vehicle/vid/",
"id": "vid",
"created_at": "2016-03-05T20:05:16.240000Z",
"updated_at": "2016-08-29T01:52:59.597898Z",
"make": "Honda",
"model": "Element",
"year": 2007,
"submodel": "EX",
"display_name": DISPLAY_NAME,
"fuel_grade": "regular",
"fuel_level_percent": FUEL_LEVEL,
"active_dtcs": []
}]
}, 200)
elif str(args[0]).startswith(URL_TRIPS):
return MockResponse({
"_metadata": {
"count": 1594,
"next": "https://api.automatic.com/trip/?page=2",
"previous": None
},
"results": [
{
"url": "https://api.automatic.com/trip/tid1/",
"id": "tid1",
"driver": "https://api.automatic.com/user/uid/",
"user": "https://api.automatic.com/user/uid/",
"started_at": "2016-08-28T19:37:23.986000Z",
"ended_at": "2016-08-28T19:43:22.500000Z",
"distance_m": 3931.6,
"duration_s": 358.5,
"vehicle": "https://api.automatic.com/vehicle/vid/",
"start_location": {
"lat": 32.87336,
"lon": -117.22743,
"accuracy_m": 10
},
"start_address": {
"name": "123 Fake St, Nowhere, NV 12345",
"display_name": "123 Fake St, Nowhere, NV",
"street_number": "Unknown",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"end_location": {
"lat": LATITUDE,
"lon": LONGITUDE,
"accuracy_m": ACCURACY
},
"end_address": {
"name": "321 Fake St, Nowhere, NV 12345",
"display_name": "321 Fake St, Nowhere, NV",
"street_number": "Unknown",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"path": "path",
"vehicle_events": [],
"start_timezone": "America/Denver",
"end_timezone": "America/Denver",
"idling_time_s": 0,
"tags": []
},
{
"url": "https://api.automatic.com/trip/tid2/",
"id": "tid2",
"driver": "https://api.automatic.com/user/uid/",
"user": "https://api.automatic.com/user/uid/",
"started_at": "2016-08-28T18:48:00.727000Z",
"ended_at": "2016-08-28T18:55:25.800000Z",
"distance_m": 3969.1,
"duration_s": 445.1,
"vehicle": "https://api.automatic.com/vehicle/vid/",
"start_location": {
"lat": 32.87336,
"lon": -117.22743,
"accuracy_m": 11
},
"start_address": {
"name": "123 Fake St, Nowhere, NV, USA",
"display_name": "Fake St, Nowhere, NV",
"street_number": "123",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"end_location": {
"lat": 32.82336,
"lon": -117.23743,
"accuracy_m": 10
},
"end_address": {
"name": "321 Fake St, Nowhere, NV, USA",
"display_name": "Fake St, Nowhere, NV",
"street_number": "Unknown",
"street_name": "Fake St",
"city": "Nowhere",
"state": "NV",
"country": "US"
},
"path": "path",
"vehicle_events": [],
"start_timezone": "America/Denver",
"end_timezone": "America/Denver",
"idling_time_s": 0,
"tags": []
}
]
}, 200)
else:
_LOGGER.debug('UNKNOWN ROUTE')
class TestAutomatic(unittest.TestCase):
"""Test cases around the automatic device scanner."""
def see_mock(self, **kwargs):
"""Mock see function."""
self.assertEqual('vid', kwargs.get('dev_id'))
self.assertEqual(FUEL_LEVEL,
kwargs.get('attributes', {}).get('fuel_level'))
self.assertEqual((LATITUDE, LONGITUDE), kwargs.get('gps'))
self.assertEqual(ACCURACY, kwargs.get('gps_accuracy'))
def setUp(self):
"""Set up test data."""
def tearDown(self):
"""Tear down test data."""
@patch('requests.get', side_effect=mocked_requests)
@patch('requests.post', side_effect=mocked_requests)
def test_invalid_credentials(self, mock_get, mock_post):
"""Test error is raised with invalid credentials."""
config = {
'platform': 'automatic',
'username': INVALID_USERNAME,
'password': PASSWORD,
'client_id': CLIENT_ID,
'secret': CLIENT_SECRET
}
self.assertFalse(setup_scanner(None, config, self.see_mock))
@patch('requests.get', side_effect=mocked_requests)
@patch('requests.post', side_effect=mocked_requests)
def test_valid_credentials(self, mock_get, mock_post):
"""Test error is raised with invalid credentials."""
config = {
'platform': 'automatic',
'username': VALID_USERNAME,
'password': PASSWORD,
'client_id': CLIENT_ID,
'secret': CLIENT_SECRET
}
self.assertTrue(setup_scanner(None, config, self.see_mock))
@patch('requests.get', side_effect=mocked_requests)
@patch('requests.post', side_effect=mocked_requests)
def test_device_attributes(self, mock_get, mock_post):
"""Test device attributes are set on load."""
config = {
'platform': 'automatic',
'username': VALID_USERNAME,
'password': PASSWORD,
'client_id': CLIENT_ID,
'secret': CLIENT_SECRET
}
scanner = AutomaticDeviceScanner(config, self.see_mock)
self.assertEqual(DISPLAY_NAME, scanner.get_device_name('vid'))