Moved API key checking to proper module

This commit is contained in:
krateng 2022-01-06 05:19:56 +01:00
parent 40e733a054
commit 80acf6275f
6 changed files with 33 additions and 51 deletions

20
maloja/apis/_apikeys.py Normal file
View File

@ -0,0 +1,20 @@
from ..globalconf import apikeystore
# skip regular authentication if api key is present in request
# an api key now ONLY permits scrobbling tracks, no other admin tasks
def api_key_correct(request):
args = request.params
try:
args.update(request.json)
except:
pass
if "key" in args:
apikey = args.pop("key")
elif "apikey" in args:
apikey = args.pop("apikey")
else: return False
return checkAPIkey(apikey)
def checkAPIkey(key):
return apikeystore.check_key(key)
def allAPIkeys():
return [apikeystore[k] for k in apikeystore]

View File

@ -1,6 +1,7 @@
from ._base import APIHandler
from ._exceptions import *
from .. import database
from ._apikeys import checkAPIkey, allAPIkeys
class Audioscrobbler(APIHandler):
__apiname__ = "Audioscrobbler"
@ -36,14 +37,14 @@ class Audioscrobbler(APIHandler):
password = keys.get("password")
# either username and password
if user is not None and password is not None:
if password in database.allAPIkeys():
if checkAPIkey(password):
sessionkey = generate_key(self.mobile_sessions)
return 200,{"session":{"key":sessionkey}}
else:
raise InvalidAuthException()
# or username and token (deprecated by lastfm)
elif user is not None and token is not None:
for key in database.allAPIkeys():
for key in allAPIkeys():
if md5(user + md5(key)) == token:
sessionkey = generate_key(self.mobile_sessions)
return 200,{"session":{"key":sessionkey}}
@ -89,6 +90,6 @@ def generate_key(ls):
random.choice(
list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") +
list("ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) for _ in range(64))
ls.append(key)
return key

View File

@ -1,6 +1,7 @@
from ._base import APIHandler
from ._exceptions import *
from .. import database
from ._apikeys import checkAPIkey, allAPIkeys
from bottle import request
@ -41,7 +42,7 @@ class AudioscrobblerLegacy(APIHandler):
protocol = 'http' if (keys.get("u") == 'nossl') else request.urlparts.scheme
if auth is not None:
for key in database.allAPIkeys():
for key in allAPIkeys():
if check_token(auth, key, timestamp):
sessionkey = generate_key(self.mobile_sessions)
return 200, (

View File

@ -2,6 +2,7 @@ from ._base import APIHandler
from ._exceptions import *
from .. import database
import datetime
from ._apikeys import checkAPIkey
from ..globalconf import malojaconfig
@ -36,7 +37,7 @@ class Listenbrainz(APIHandler):
except:
raise BadAuthException()
if token not in database.allAPIkeys():
if not checkAPIkey(token):
raise InvalidAuthException()
try:
@ -69,7 +70,7 @@ class Listenbrainz(APIHandler):
token = self.get_token_from_request_keys(keys)
except:
raise BadAuthException()
if token not in database.allAPIkeys():
if not checkAPIkey(token):
raise InvalidAuthException()
else:
return 200,{"code":200,"message":"Token valid.","valid":True,"user_name":malojaconfig["NAME"]}

View File

@ -3,6 +3,7 @@ from ..globalconf import malojaconfig, apikeystore
from ..__pkginfo__ import VERSION
from ..malojauri import uri_to_internal
from .. import utilities
from ._apikeys import api_key_correct
from bottle import response, static_file
@ -15,6 +16,9 @@ api = API(delay=True)
api.__apipath__ = "mlj_1"
@api.get("test")
def test_server(key=None):
"""Pings the server. If an API key is supplied, the server will respond with 200

View File

@ -42,7 +42,6 @@ import urllib
dblock = Lock() #global database lock
dbstatus = {
"healthy":False,
"rebuildinprogress":False,
@ -69,34 +68,6 @@ cla = CleanerAgent()
coa = CollectorAgent()
def checkAPIkey(key):
return apikeystore.check_key(key)
def allAPIkeys():
return [apikeystore[k] for k in apikeystore]
####
## Getting dict representations of database objects
####
def get_scrobble_dict(o):
track = get_track_dict(TRACKS[o.track])
return {"artists":track["artists"],"title":track["title"],"time":o.timestamp,"album":o.album,"duration":o.duration}
def get_artist_dict(o):
return o
#technically not a dict, but... you know
def get_track_dict(o):
artists = [get_artist_dict(ARTISTS[a]) for a in o.artists]
return {"artists":artists,"title":o.title}
####
## Creating or finding existing database entries
####
def createScrobble(artists,title,time,album=None,duration=None,volatile=False):
@ -138,23 +109,7 @@ def createScrobble(artists,title,time,album=None,duration=None,volatile=False):
########
########
# skip regular authentication if api key is present in request
# an api key now ONLY permits scrobbling tracks, no other admin tasks
def api_key_correct(request):
args = request.params
try:
args.update(request.json)
except:
pass
if "key" in args:
apikey = args["key"]
del args["key"]
elif "apikey" in args:
apikey = args["apikey"]
del args["apikey"]
else: return False
return checkAPIkey(apikey)