First cut mobile beacon tracking.

This commit is contained in:
pavoni 2016-01-29 11:49:44 +00:00
parent c5de42e7b5
commit 80d2f35cc5
2 changed files with 102 additions and 4 deletions

View File

@ -17,6 +17,9 @@ from homeassistant.const import STATE_HOME
DEPENDENCIES = ['mqtt']
REGIONS_ENTERED = defaultdict(list)
MOBILE_BEACONS_ACTIVE = defaultdict(list)
BEACON_DEV_ID = 'beacon'
LOCATION_TOPIC = 'owntracks/+/+'
EVENT_TOPIC = 'owntracks/+/+/event'
@ -56,6 +59,7 @@ def setup_scanner(hass, config, see):
return
see(**kwargs)
see_beacons(dev_id, kwargs)
def owntracks_event_update(topic, payload, qos):
# pylint: disable=too-many-branches
@ -85,7 +89,13 @@ def setup_scanner(hass, config, see):
if data['event'] == 'enter':
zone = hass.states.get("zone.{}".format(location))
with LOCK:
if zone is not None:
if zone is None:
if data['t'] == 'b':
# Not a HA zone, and a beacon so assume mobile
MOBILE_BEACONS_ACTIVE[dev_id].append(location)
else:
# Normal region
kwargs['location_name'] = location
regions = REGIONS_ENTERED[dev_id]
@ -95,6 +105,7 @@ def setup_scanner(hass, config, see):
_set_gps_from_zone(kwargs, zone)
see(**kwargs)
see_beacons(dev_id, kwargs)
elif data['event'] == 'leave':
regions = REGIONS_ENTERED[dev_id]
@ -113,6 +124,11 @@ def setup_scanner(hass, config, see):
_LOGGER.info("Exit from %s to GPS", location)
see(**kwargs)
see_beacons(dev_id, kwargs)
beacons = MOBILE_BEACONS_ACTIVE[dev_id]
if location in beacons:
beacons.remove(location)
else:
_LOGGER.error(
@ -120,6 +136,14 @@ def setup_scanner(hass, config, see):
data['event'])
return
def see_beacons(dev_id, kwargs):
# Live beacons should be set to the same location
_LOGGER.info(MOBILE_BEACONS_ACTIVE)
for beacon in MOBILE_BEACONS_ACTIVE[dev_id]:
kwargs['dev_id'] = beacon
kwargs['host_name'] = BEACON_DEV_ID
see(**kwargs)
mqtt.subscribe(hass, LOCATION_TOPIC, owntracks_location_update, 1)
mqtt.subscribe(hass, EVENT_TOPIC, owntracks_event_update, 1)

View File

@ -27,6 +27,9 @@ EVENT_TOPIC = "owntracks/{}/{}/event".format(USER, DEVICE)
DEVICE_TRACKER_STATE = "device_tracker.{}_{}".format(USER, DEVICE)
IBEACON_DEVICE = 'keys'
REGION_TRACKER_STATE = "device_tracker.{}".format(IBEACON_DEVICE)
LOCATION_MESSAGE = {
'batt': 92,
'cog': 248,
@ -109,6 +112,7 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
# Clear state between teste
self.hass.states.set(DEVICE_TRACKER_STATE, None)
owntracks.REGIONS_ENTERED = defaultdict(list)
owntracks.MOBILE_BEACONS_ACTIVE = defaultdict(list)
def teardown_method(self, method):
""" Stop down stuff we started. """
@ -136,6 +140,18 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
state = self.hass.states.get(DEVICE_TRACKER_STATE)
self.assertEqual(state.attributes.get('gps_accuracy'), accuracy)
def assert_tracker_state(self, location):
state = self.hass.states.get(REGION_TRACKER_STATE)
self.assertEqual(state.state, location)
def assert_tracker_latitude(self, latitude):
state = self.hass.states.get(REGION_TRACKER_STATE)
self.assertEqual(state.attributes.get('latitude'), latitude)
def assert_tracker_accuracy(self, accuracy):
state = self.hass.states.get(REGION_TRACKER_STATE)
self.assertEqual(state.attributes.get('gps_accuracy'), accuracy)
def test_location_update(self):
self.send_message(LOCATION_TOPIC, LOCATION_MESSAGE)
@ -189,7 +205,6 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
# Enter inner2 zone
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = "inner_2"
@ -198,7 +213,6 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
self.assert_location_latitude(2.1)
self.assert_location_accuracy(10.0)
# Exit inner_2 - should be in 'inner'
message = REGION_LEAVE_MESSAGE.copy()
message['desc'] = "inner_2"
@ -213,7 +227,6 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
self.assert_location_latitude(2.0)
self.assert_location_accuracy(60.0)
def test_event_entry_exit_wrong_order(self):
# Enter inner zone
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
@ -259,3 +272,64 @@ class TestDeviceTrackerOwnTracks(unittest.TestCase):
self.send_message(EVENT_TOPIC, REGION_ENTER_MESSAGE)
self.assert_location_state('inner')
def test_mobile_enter_move_beacon(self):
# Enter mobile beacon, should set location
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = IBEACON_DEVICE
self.send_message(EVENT_TOPIC, message)
self.assert_tracker_latitude(2.0)
self.assert_tracker_state('outer')
# Move should move beacon
message = LOCATION_MESSAGE.copy()
message['lat'] = "3.0"
self.send_message(LOCATION_TOPIC, message)
self.assert_tracker_latitude(3.0)
self.assert_tracker_state(STATE_NOT_HOME)
def test_mobile_enter_exit_region_beacon(self):
# Start tracking beacon
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = IBEACON_DEVICE
self.send_message(EVENT_TOPIC, message)
self.assert_tracker_latitude(2.0)
self.assert_tracker_state('outer')
# Enter location should move beacon
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = "inner_2"
self.send_message(EVENT_TOPIC, message)
self.assert_tracker_latitude(2.1)
self.assert_tracker_state('inner_2')
# Exit location should switch to gps
message = REGION_LEAVE_MESSAGE.copy()
message['desc'] = "inner_2"
self.send_message(EVENT_TOPIC, message)
self.assert_tracker_latitude(2.0)
def test_mobile_exit_move_beacon(self):
# Start tracking beacon
message = REGION_ENTER_MESSAGE.copy()
message['desc'] = IBEACON_DEVICE
self.send_message(EVENT_TOPIC, message)
self.assert_tracker_latitude(2.0)
self.assert_tracker_state('outer')
# Exit mobile beacon, should set location
message = REGION_LEAVE_MESSAGE.copy()
message['desc'] = IBEACON_DEVICE
message['lat'] = "3.0"
self.send_message(EVENT_TOPIC, message)
self.assert_tracker_latitude(3.0)
# Move after exit should do nothing
message = LOCATION_MESSAGE.copy()
message['lat'] = "4.0"
self.send_message(LOCATION_TOPIC, LOCATION_MESSAGE)
self.assert_tracker_latitude(3.0)