Remove unused library

Remove the library used for dev
This commit is contained in:
hydreliox 2016-01-11 05:35:26 +01:00
parent 7b999e6cd1
commit 1f1a46a8bd

View File

@ -1,305 +0,0 @@
# Published Jan 2013
# Revised Jan 2014 (to add new modules data)
# Author : Philippe Larduinat, philippelt@users.sourceforge.net
# Public domain source code
# This API provides access to the Netatmo (Internet weather station) devices
# This package can be used with Python2 or Python3 applications and do not
# require anything else than standard libraries
# PythonAPI Netatmo REST data access
# coding=utf-8
from sys import version_info
import json, time
# HTTP libraries depends upon Python 2 or 3
if version_info.major == 3 :
import urllib.parse, urllib.request
else:
from urllib import urlencode
import urllib2
######################## USER SPECIFIC INFORMATION ######################
# To be able to have a program accessing your netatmo data, you have to register your program as
# a Netatmo app in your Netatmo account. All you have to do is to give it a name (whatever) and you will be
# returned a client_id and secret that your app has to supply to access netatmo servers.
_CLIENT_ID = "55716d9c1b77591e138b4747" # Your client ID from Netatmo app registration at http://dev.netatmo.com/dev/listapps
_CLIENT_SECRET = "OA4Eb0oZW3B6YyR9wNh2HMkri2wV8g" # Your client app secret ' '
_USERNAME = "hydreliox@gmail.com" # Your netatmo account username
_PASSWORD = "netatmo@gTV7y5Te" # Your netatmo account password
#########################################################################
# Common definitions
_BASE_URL = "https://api.netatmo.net/"
_AUTH_REQ = _BASE_URL + "oauth2/token"
_GETUSER_REQ = _BASE_URL + "api/getuser"
_DEVICELIST_REQ = _BASE_URL + "api/devicelist"
_GETMEASURE_REQ = _BASE_URL + "api/getmeasure"
class ClientAuth:
"Request authentication and keep access token available through token method. Renew it automatically if necessary"
def __init__(self, clientId=_CLIENT_ID,
clientSecret=_CLIENT_SECRET,
username=_USERNAME,
password=_PASSWORD):
postParams = {
"grant_type" : "password",
"client_id" : clientId,
"client_secret" : clientSecret,
"username" : username,
"password" : password,
"scope" : "read_station"
}
resp = postRequest(_AUTH_REQ, postParams)
self._clientId = clientId
self._clientSecret = clientSecret
self._accessToken = resp['access_token']
self.refreshToken = resp['refresh_token']
self._scope = resp['scope']
self.expiration = int(resp['expire_in'] + time.time())
@property
def accessToken(self):
if self.expiration < time.time(): # Token should be renewed
postParams = {
"grant_type" : "refresh_token",
"refresh_token" : self.refreshToken,
"client_id" : self._clientId,
"client_secret" : self._clientSecret
}
resp = postRequest(_AUTH_REQ, postParams)
self._accessToken = resp['access_token']
self.refreshToken = resp['refresh_token']
self.expiration = int(resp['expire_in'] + time.time())
return self._accessToken
class User:
def __init__(self, authData):
postParams = {
"access_token" : authData.accessToken
}
resp = postRequest(_GETUSER_REQ, postParams)
self.rawData = resp['body']
self.id = self.rawData['_id']
self.devList = self.rawData['devices']
self.ownerMail = self.rawData['mail']
class DeviceList:
def __init__(self, authData):
self.getAuthToken = authData.accessToken
postParams = {
"access_token" : self.getAuthToken,
"app_type" : "app_station"
}
resp = postRequest(_DEVICELIST_REQ, postParams)
self.rawData = resp['body']
self.stations = { d['_id'] : d for d in self.rawData['devices'] }
self.modules = { m['_id'] : m for m in self.rawData['modules'] }
self.default_station = list(self.stations.values())[0]['station_name']
def modulesNamesList(self, station=None):
res = [m['module_name'] for m in self.modules.values()]
res.append(self.stationByName(station)['module_name'])
return res
def stationByName(self, station=None):
if not station : station = self.default_station
for i,s in self.stations.items():
if s['station_name'] == station : return self.stations[i]
return None
def stationById(self, sid):
return None if sid not in self.stations else self.stations[sid]
def moduleByName(self, module, station=None):
s = None
if station :
s = self.stationByName(station)
if not s : return None
for m in self.modules:
mod = self.modules[m]
if mod['module_name'] == module :
if not s or mod['main_device'] == s['_id'] : return mod
return None
def moduleById(self, mid, sid=None):
s = self.stationById(sid) if sid else None
if mid in self.modules :
return self.modules[mid] if not s or self.modules[mid]['main_device'] == s['_id'] else None
def lastData(self, station=None, exclude=0):
s = self.stationByName(station)
if not s : return None
lastD = dict()
# Define oldest acceptable sensor measure event
limit = (time.time() - exclude) if exclude else 0
ds = s['dashboard_data']
if ds['time_utc'] > limit :
lastD[s['module_name']] = ds.copy()
lastD[s['module_name']]['When'] = lastD[s['module_name']].pop("time_utc")
lastD[s['module_name']]['wifi_status'] = s['wifi_status']
for mId in s["modules"]:
ds = self.modules[mId]['dashboard_data']
if ds['time_utc'] > limit :
mod = self.modules[mId]
lastD[mod['module_name']] = ds.copy()
lastD[mod['module_name']]['When'] = lastD[mod['module_name']].pop("time_utc")
# For potential use, add battery and radio coverage information to module data if present
for i in ('battery_vp', 'rf_status') :
if i in mod : lastD[mod['module_name']][i] = mod[i]
return lastD
def checkNotUpdated(self, station=None, delay=3600):
res = self.lastData(station)
ret = []
for mn,v in res.items():
if time.time()-v['When'] > delay : ret.append(mn)
return ret if ret else None
def checkUpdated(self, station=None, delay=3600):
res = self.lastData(station)
ret = []
for mn,v in res.items():
if time.time()-v['When'] < delay : ret.append(mn)
return ret if ret else None
def getMeasure(self, device_id, scale, mtype, module_id=None, date_begin=None, date_end=None, limit=None, optimize=False, real_time=False):
postParams = { "access_token" : self.getAuthToken }
postParams['device_id'] = device_id
if module_id : postParams['module_id'] = module_id
postParams['scale'] = scale
postParams['type'] = mtype
if date_begin : postParams['date_begin'] = date_begin
if date_end : postParams['date_end'] = date_end
if limit : postParams['limit'] = limit
postParams['optimize'] = "true" if optimize else "false"
postParams['real_time'] = "true" if real_time else "false"
return postRequest(_GETMEASURE_REQ, postParams)
def MinMaxTH(self, station=None, module=None, frame="last24"):
if not station : station = self.default_station
s = self.stationByName(station)
if not s :
s = self.stationById(station)
if not s : return None
if frame == "last24":
end = time.time()
start = end - 24*3600 # 24 hours ago
elif frame == "day":
start, end = todayStamps()
if module and module != s['module_name']:
m = self.moduleByName(module, s['station_name'])
if not m :
m = self.moduleById(s['_id'], module)
if not m : return None
# retrieve module's data
resp = self.getMeasure(
device_id = s['_id'],
module_id = m['_id'],
scale = "max",
mtype = "Temperature,Humidity",
date_begin = start,
date_end = end)
else : # retrieve station's data
resp = self.getMeasure(
device_id = s['_id'],
scale = "max",
mtype = "Temperature,Humidity",
date_begin = start,
date_end = end)
if resp:
T = [v[0] for v in resp['body'].values()]
H = [v[1] for v in resp['body'].values()]
return min(T), max(T), min(H), max(H)
else:
return None
# Utilities routines
def postRequest(url, params):
if version_info.major == 3:
req = urllib.request.Request(url)
req.add_header("Content-Type","application/x-www-form-urlencoded;charset=utf-8")
params = urllib.parse.urlencode(params).encode('utf-8')
resp = urllib.request.urlopen(req, params).read().decode("utf-8")
else:
params = urlencode(params)
headers = {"Content-Type" : "application/x-www-form-urlencoded;charset=utf-8"}
req = urllib2.Request(url=url, data=params, headers=headers)
resp = urllib2.urlopen(req).read()
return json.loads(resp)
def toTimeString(value):
return time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime(int(value)))
def toEpoch(value):
return int(time.mktime(time.strptime(value,"%Y-%m-%d_%H:%M:%S")))
def todayStamps():
today = time.strftime("%Y-%m-%d")
today = int(time.mktime(time.strptime(today,"%Y-%m-%d")))
return today, today+3600*24
# Global shortcut
def getStationMinMaxTH(station=None, module=None):
authorization = ClientAuth()
devList = DeviceList(authorization)
if not station : station = devList.default_station
if module :
mname = module
else :
mname = devList.stationByName(station)['module_name']
lastD = devList.lastData(station)
if mname == "*":
result = dict()
for m in lastD.keys():
if time.time()-lastD[m]['When'] > 3600 : continue
r = devList.MinMaxTH(module=m)
result[m] = (r[0], lastD[m]['Temperature'], r[1])
else:
if time.time()-lastD[mname]['When'] > 3600 : result = ["-", "-"]
else : result = [lastD[mname]['Temperature'], lastD[mname]['Humidity']]
result.extend(devList.MinMaxTH(station, mname))
return result
# auto-test when executed directly
if __name__ == "__main__":
from sys import exit, stdout, stderr
if not _CLIENT_ID or not _CLIENT_SECRET or not _USERNAME or not _PASSWORD :
stderr.write("Library source missing identification arguments to check lnetatmo.py (user/password/etc...)")
exit(1)
authorization = ClientAuth() # Test authentication method
user = User(authorization) # Test GETUSER
devList = DeviceList(authorization) # Test DEVICELIST
devList.MinMaxTH() # Test GETMEASURE
# If we reach this line, all is OK
# If launched interactively, display OK message
if stdout.isatty():
print("lnetatmo.py : OK")
exit(0)