1
0
mirror of https://github.com/krateng/maloja.git synced 2023-08-10 21:12:55 +03:00
maloja/compliant_api.py

215 lines
6.3 KiB
Python
Raw Normal View History

from doreah.logging import log
import hashlib
import random
import database
import datetime
import itertools
import sys
from cleanup import CleanerAgent
from bottle import response
## GNU-FM-compliant scrobbling
cla = CleanerAgent()
def md5(input):
m = hashlib.md5()
m.update(bytes(input,encoding="utf-8"))
return m.hexdigest()
2019-05-12 19:39:46 +03:00
def generate_key(ls):
key = ""
for i in range(64):
key += str(random.choice(list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") + list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
ls.append(key)
return key
#def check_sig(keys):
# try:
# sig = keys.pop("api_sig")
# text = "".join([key + keys[key] for key in sorted(keys.keys())]) + # secret
# assert sig == md5(text)
# return True
# except:
# return False
handlers = {}
def handler(apiname,version):
def deco(cls):
handlers[(apiname,version)] = cls()
return cls
return deco
2019-05-23 14:13:42 +03:00
def handle(path,keys):
2019-05-12 19:39:46 +03:00
print("API request: " + str(path))
print("Keys:")
for k in keys:
2019-05-23 14:13:42 +03:00
print("\t",k,":",keys.get(k))
2019-05-12 19:39:46 +03:00
if len(path)>1 and (path[0],path[1]) in handlers:
handler = handlers[(path[0],path[1])]
path = path[2:]
try:
response.status,result = handler.handle(path,keys)
except:
type = sys.exc_info()[0]
response.status,result = handler.errors[type]
2019-05-12 19:39:46 +03:00
else:
result = {"error":"Invalid scrobble protocol"}
response.status = 500
print("Response: " + str(result))
return result
2019-05-15 11:11:41 +03:00
def scrobbletrack(artiststr,titlestr,timestamp):
try:
2019-08-22 22:35:58 +03:00
log("Incoming scrobble (compliant API): ARTISTS: " + artiststr + ", TRACK: " + titlestr,module="debug")
2019-05-15 11:11:41 +03:00
(artists,title) = cla.fullclean(artiststr,titlestr)
database.createScrobble(artists,title,timestamp)
database.sync()
except:
raise ScrobblingException()
class BadAuthException(Exception): pass
class InvalidAuthException(Exception): pass
class InvalidMethodException(Exception): pass
class InvalidSessionKey(Exception): pass
class MalformedJSONException(Exception): pass
2019-05-15 11:11:41 +03:00
class ScrobblingException(Exception): pass
class APIHandler:
# make these classes singletons
_instance = None
def __new__(cls, *args, **kwargs):
if not isinstance(cls._instance, cls):
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance
def handle(self,pathnodes,keys):
try:
methodname = self.get_method(pathnodes,keys)
method = self.methods[methodname]
except:
raise InvalidMethodException()
return method(pathnodes,keys)
@handler("audioscrobbler","2.0")
@handler("gnufm","2.0")
@handler("gnukebox","2.0")
class GNUFM2(APIHandler):
def __init__(self):
# no need to save these on disk, clients can always request a new session
self.mobile_sessions = []
self.methods = {
"auth.getMobileSession":self.authmobile,
"track.scrobble":self.scrobble
}
self.errors = {
BadAuthException:(400,{"error":6,"message":"Requires authentication"}),
InvalidAuthException:(401,{"error":4,"message":"Invalid credentials"}),
InvalidMethodException:(200,{"error":3,"message":"Invalid method"}),
2019-05-15 11:11:41 +03:00
InvalidSessionKey:(403,{"error":9,"message":"Invalid session key"}),
ScrobblingException:(500,{"error":8,"message":"Operation failed"})
}
def get_method(self,pathnodes,keys):
return keys.get("method")
def authmobile(self,pathnodes,keys):
token = keys.get("authToken")
user = keys.get("username")
password = keys.get("password")
# either username and password
if user is not None and password is not None:
if password in database.allAPIkeys():
sessionkey = generate_key(self.mobile_sessions)
return 200,{"session":{"key":sessionkey}}
else:
raise InvalidAuthException()
# or username and token (deprecated by lastfm)
elif user is not None and token is not None:
for key in database.allAPIkeys():
if md5(user + md5(key)) == token:
sessionkey = generate_key(self.mobile_sessions)
return 200,{"session":{"key":sessionkey}}
raise InvalidAuthException()
else:
raise BadAuthException()
def scrobble(self,pathnodes,keys):
if keys.get("sk") is None or keys.get("sk") not in self.mobile_sessions:
raise InvalidSessionKey()
else:
if "track" in keys and "artist" in keys:
artiststr,titlestr = keys["artist"], keys["track"]
2019-05-15 11:11:41 +03:00
#(artists,title) = cla.fullclean(artiststr,titlestr)
timestamp = int(keys["timestamp"])
2019-05-15 11:11:41 +03:00
#database.createScrobble(artists,title,timestamp)
scrobbletrack(artiststr,titlestr,timestamp)
return 200,{"scrobbles":{"@attr":{"ignored":0}}}
else:
for num in range(50):
if "track[" + str(num) + "]" in keys:
artiststr,titlestr = keys["artist[" + str(num) + "]"], keys["track[" + str(num) + "]"]
2019-05-15 11:11:41 +03:00
#(artists,title) = cla.fullclean(artiststr,titlestr)
timestamp = int(keys["timestamp[" + str(num) + "]"])
2019-05-15 11:11:41 +03:00
#database.createScrobble(artists,title,timestamp)
scrobbletrack(artiststr,titlestr,timestamp)
return 200,{"scrobbles":{"@attr":{"ignored":0}}}
@handler("listenbrainz","1")
@handler("lbrnz","1")
class LBrnz1(APIHandler):
def __init__(self):
self.methods = {
"submit-listens":self.submit
}
self.errors = {
BadAuthException:(401,{"code":401,"error":"You need to provide an Authorization header."}),
InvalidAuthException:(401,{"code":401,"error":"Incorrect Authorization"}),
InvalidMethodException:(200,{"code":200,"error":"Invalid Method"}),
2019-05-15 11:11:41 +03:00
MalformedJSONException:(400,{"code":400,"error":"Invalid JSON document submitted."}),
ScrobblingException:(500,{"code":500,"error":"Unspecified server error."})
}
def get_method(self,pathnodes,keys):
print(pathnodes)
return pathnodes.pop(0)
def submit(self,pathnodes,keys):
try:
token = keys.get("Authorization").replace("token ","").replace("Token ","").strip()
except:
raise BadAuthException()
if token not in database.allAPIkeys():
raise InvalidAuthException()
try:
if keys["listen_type"] in ["single","import"]:
payload = keys["payload"]
for listen in payload:
metadata = listen["track_metadata"]
artiststr, titlestr = metadata["artist_name"], metadata["track_name"]
2019-05-15 11:11:41 +03:00
#(artists,title) = cla.fullclean(artiststr,titlestr)
try:
timestamp = int(listen["listened_at"])
except:
timestamp = int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
2019-05-15 11:11:41 +03:00
#database.createScrobble(artists,title,timestamp)
scrobbletrack(artiststr,titlestr,timestamp)
return 200,{"code":200,"status":"ok"}
else:
return 200,{"code":200,"status":"ok"}
except:
raise MalformedJSONException()