mirror of
https://github.com/krateng/maloja.git
synced 2023-08-10 21:12:55 +03:00
Refactoring (#83)
* Merge isinstance calls * Inline variable that is immediately returned * Replace set() with comprehension * Replace assignment with augmented assignment * Remove unnecessary else after guard condition * Convert for loop into list comprehension * Replace unused for index with underscore * Merge nested if conditions * Convert for loop into list comprehension * Convert for loop into set comprehension * Remove unnecessary else after guard condition * Replace if statements with if expressions * Simplify sequence comparison * Replace multiple comparisons with in operator * Merge isinstance calls * Merge nested if conditions * Add guard clause * Merge duplicate blocks in conditional * Replace unneeded comprehension with generator * Inline variable that is immediately returned * Remove unused imports * Replace unneeded comprehension with generator * Remove unused imports * Remove unused import * Inline variable that is immediately returned * Swap if/else branches and remove unnecessary else * Use str.join() instead of for loop * Multiple refactors - Remove redundant pass statement - Hoist repeated code outside conditional statement - Swap if/else to remove empty if body * Inline variable that is immediately returned * Simplify generator expression * Replace if statement with if expression * Multiple refactoring - Replace range(0, x) with range(x) - Swap if/else branches - Remove unnecessary else after guard condition * Use str.join() instead of for loop * Hoist repeated code outside conditional statement * Use str.join() instead of for loop * Inline variables that are immediately returned * Merge dictionary assignment with declaration * Use items() to directly unpack dictionary values * Extract dup code from methods into a new one
This commit is contained in:
@@ -54,26 +54,25 @@ class Audioscrobbler(APIHandler):
|
||||
def submit_scrobble(self,pathnodes,keys):
|
||||
if keys.get("sk") is None or keys.get("sk") not in self.mobile_sessions:
|
||||
raise InvalidSessionKey()
|
||||
if "track" in keys and "artist" in keys:
|
||||
artiststr,titlestr = keys["artist"], keys["track"]
|
||||
#(artists,title) = cla.fullclean(artiststr,titlestr)
|
||||
try:
|
||||
timestamp = int(keys["timestamp"])
|
||||
except:
|
||||
timestamp = None
|
||||
#database.createScrobble(artists,title,timestamp)
|
||||
self.scrobble(artiststr,titlestr,time=timestamp)
|
||||
else:
|
||||
if "track" in keys and "artist" in keys:
|
||||
artiststr,titlestr = keys["artist"], keys["track"]
|
||||
#(artists,title) = cla.fullclean(artiststr,titlestr)
|
||||
try:
|
||||
timestamp = int(keys["timestamp"])
|
||||
except:
|
||||
timestamp = None
|
||||
#database.createScrobble(artists,title,timestamp)
|
||||
self.scrobble(artiststr,titlestr,time=timestamp)
|
||||
return 200,{"scrobbles":{"@attr":{"ignored":0}}}
|
||||
else:
|
||||
for num in range(50):
|
||||
if "track[" + str(num) + "]" in keys:
|
||||
artiststr,titlestr = keys["artist[" + str(num) + "]"], keys["track[" + str(num) + "]"]
|
||||
#(artists,title) = cla.fullclean(artiststr,titlestr)
|
||||
timestamp = int(keys["timestamp[" + str(num) + "]"])
|
||||
#database.createScrobble(artists,title,timestamp)
|
||||
self.scrobble(artiststr,titlestr,time=timestamp)
|
||||
return 200,{"scrobbles":{"@attr":{"ignored":0}}}
|
||||
for num in range(50):
|
||||
if "track[" + str(num) + "]" in keys:
|
||||
artiststr,titlestr = keys["artist[" + str(num) + "]"], keys["track[" + str(num) + "]"]
|
||||
#(artists,title) = cla.fullclean(artiststr,titlestr)
|
||||
timestamp = int(keys["timestamp[" + str(num) + "]"])
|
||||
#database.createScrobble(artists,title,timestamp)
|
||||
self.scrobble(artiststr,titlestr,time=timestamp)
|
||||
|
||||
return 200,{"scrobbles":{"@attr":{"ignored":0}}}
|
||||
|
||||
|
||||
import hashlib
|
||||
@@ -85,8 +84,11 @@ def md5(input):
|
||||
return m.hexdigest()
|
||||
|
||||
def generate_key(ls):
|
||||
key = ""
|
||||
for i in range(64):
|
||||
key += str(random.choice(list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") + list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
|
||||
key = "".join(
|
||||
str(
|
||||
random.choice(
|
||||
list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") +
|
||||
list("ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) for _ in range(64))
|
||||
|
||||
ls.append(key)
|
||||
return key
|
||||
|
@@ -38,8 +38,7 @@ class AudioscrobblerLegacy(APIHandler):
|
||||
timestamp = keys.get("t")
|
||||
apikey = keys.get("api_key")
|
||||
host = keys.get("Host")
|
||||
protocol = request.urlparts.scheme
|
||||
if (keys.get("u") == 'nossl'): protocol = 'http' #user override
|
||||
protocol = 'http' if (keys.get("u") == 'nossl') else request.urlparts.scheme
|
||||
|
||||
if auth is not None:
|
||||
for key in database.allAPIkeys():
|
||||
@@ -68,22 +67,20 @@ class AudioscrobblerLegacy(APIHandler):
|
||||
def submit_scrobble(self,pathnodes,keys):
|
||||
if keys.get("s") is None or keys.get("s") not in self.mobile_sessions:
|
||||
raise InvalidSessionKey()
|
||||
else:
|
||||
for count in range(0,50):
|
||||
artist_key = f"a[{count}]"
|
||||
track_key = f"t[{count}]"
|
||||
time_key = f"i[{count}]"
|
||||
if artist_key in keys and track_key in keys:
|
||||
artiststr,titlestr = keys[artist_key], keys[track_key]
|
||||
try:
|
||||
timestamp = int(keys[time_key])
|
||||
except:
|
||||
timestamp = None
|
||||
#database.createScrobble(artists,title,timestamp)
|
||||
self.scrobble(artiststr,titlestr,time=timestamp)
|
||||
else:
|
||||
return 200,"OK\n"
|
||||
return 200,"OK\n"
|
||||
for count in range(50):
|
||||
artist_key = f"a[{count}]"
|
||||
track_key = f"t[{count}]"
|
||||
time_key = f"i[{count}]"
|
||||
if artist_key not in keys or track_key not in keys:
|
||||
return 200,"OK\n"
|
||||
artiststr,titlestr = keys[artist_key], keys[track_key]
|
||||
try:
|
||||
timestamp = int(keys[time_key])
|
||||
except:
|
||||
timestamp = None
|
||||
#database.createScrobble(artists,title,timestamp)
|
||||
self.scrobble(artiststr,titlestr,time=timestamp)
|
||||
return 200,"OK\n"
|
||||
|
||||
|
||||
import hashlib
|
||||
@@ -95,9 +92,12 @@ def md5(input):
|
||||
return m.hexdigest()
|
||||
|
||||
def generate_key(ls):
|
||||
key = ""
|
||||
for i in range(64):
|
||||
key += str(random.choice(list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") + list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")))
|
||||
key = "".join(
|
||||
str(
|
||||
random.choice(
|
||||
list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") +
|
||||
list("ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) for _ in range(64))
|
||||
|
||||
ls.append(key)
|
||||
return key
|
||||
|
||||
|
@@ -72,8 +72,7 @@ def info_external(**keys):
|
||||
response.set_header("Access-Control-Allow-Origin","*")
|
||||
response.set_header("Content-Type","application/json")
|
||||
|
||||
result = info()
|
||||
return result
|
||||
return info()
|
||||
|
||||
|
||||
|
||||
@@ -178,8 +177,7 @@ def artistInfo_external(**keys):
|
||||
k_filter, _, _, _, _ = uri_to_internal(keys,forceArtist=True)
|
||||
ckeys = {**k_filter}
|
||||
|
||||
results = artistInfo(**ckeys)
|
||||
return results
|
||||
return artistInfo(**ckeys)
|
||||
|
||||
|
||||
|
||||
@@ -192,15 +190,12 @@ def trackInfo_external(artist:Multi[str],**keys):
|
||||
k_filter, _, _, _, _ = uri_to_internal(keys,forceTrack=True)
|
||||
ckeys = {**k_filter}
|
||||
|
||||
results = trackInfo(**ckeys)
|
||||
return results
|
||||
return trackInfo(**ckeys)
|
||||
|
||||
|
||||
@api.get("compare")
|
||||
def compare_external(**keys):
|
||||
|
||||
results = compare(keys["remote"])
|
||||
return results
|
||||
return compare(keys["remote"])
|
||||
|
||||
|
||||
|
||||
@@ -297,8 +292,10 @@ def search(**keys):
|
||||
# add links
|
||||
artists_result = []
|
||||
for a in artists:
|
||||
result = {"name":a}
|
||||
result["link"] = "/artist?" + compose_querystring(internal_to_uri({"artist":a}))
|
||||
result = {
|
||||
'name': a,
|
||||
'link': "/artist?" + compose_querystring(internal_to_uri({"artist": a})),
|
||||
}
|
||||
result["image"] = "/image?" + compose_querystring(internal_to_uri({"artist":a}))
|
||||
artists_result.append(result)
|
||||
|
||||
|
@@ -35,12 +35,12 @@ class CleanerAgent:
|
||||
reqartists, allartists = self.rules_addartists[title.lower()]
|
||||
reqartists = reqartists.split("␟")
|
||||
allartists = allartists.split("␟")
|
||||
if set(reqartists).issubset(set(a.lower() for a in artists)):
|
||||
if set(reqartists).issubset({a.lower() for a in artists}):
|
||||
artists += allartists
|
||||
elif title.lower() in self.rules_fixartists:
|
||||
allartists = self.rules_fixartists[title.lower()]
|
||||
allartists = allartists.split("␟")
|
||||
if len(set(a.lower() for a in allartists) & set(a.lower() for a in artists)) > 0:
|
||||
if len({a.lower() for a in allartists} & {a.lower() for a in artists}) > 0:
|
||||
artists = allartists
|
||||
artists = list(set(artists))
|
||||
artists.sort()
|
||||
@@ -50,10 +50,9 @@ class CleanerAgent:
|
||||
def removespecial(self,s):
|
||||
if isinstance(s,list):
|
||||
return [self.removespecial(se) for se in s]
|
||||
else:
|
||||
s = s.replace("\t","").replace("␟","").replace("\n","")
|
||||
s = re.sub(" +"," ",s)
|
||||
return s
|
||||
s = s.replace("\t","").replace("␟","").replace("\n","")
|
||||
s = re.sub(" +"," ",s)
|
||||
return s
|
||||
|
||||
|
||||
# if an artist appears in any created rule, we can assume that artist is meant to exist and be spelled like that
|
||||
@@ -206,9 +205,7 @@ class CollectorAgent:
|
||||
|
||||
# get all credited artists for the artists given
|
||||
def getCreditedList(self,artists):
|
||||
updatedArtists = []
|
||||
for artist in artists:
|
||||
updatedArtists.append(self.getCredited(artist))
|
||||
updatedArtists = [self.getCredited(artist) for artist in artists]
|
||||
return list(set(updatedArtists))
|
||||
|
||||
# get artists who the given artist is given credit for
|
||||
@@ -218,7 +215,7 @@ class CollectorAgent:
|
||||
# this function is there to check for artists that we should include in the
|
||||
# database even though they never have any scrobble.
|
||||
def getAllArtists(self):
|
||||
return list(set([self.rules_countas[a] for a in self.rules_countas]))
|
||||
return list({self.rules_countas[a] for a in self.rules_countas})
|
||||
# artists that count can be nonexisting (counting HyunA as 4Minute even
|
||||
# though 4Minute has never been listened to)
|
||||
# but artists that are counted as someone else are only relevant if they
|
||||
@@ -239,6 +236,6 @@ def flatten(lis):
|
||||
if isinstance(l, str):
|
||||
newlist.append(l)
|
||||
else:
|
||||
newlist = newlist + l
|
||||
newlist += l
|
||||
|
||||
return list(set(newlist))
|
||||
|
@@ -149,10 +149,9 @@ def createScrobble(artists,title,time,album=None,duration=None,volatile=False):
|
||||
i = getTrackID(artists,title)
|
||||
|
||||
# idempotence
|
||||
if time in SCROBBLESDICT:
|
||||
if i == SCROBBLESDICT[time].track:
|
||||
dblock.release()
|
||||
return get_track_dict(TRACKS[i])
|
||||
if time in SCROBBLESDICT and i == SCROBBLESDICT[time].track:
|
||||
dblock.release()
|
||||
return get_track_dict(TRACKS[i])
|
||||
# timestamp as unique identifier
|
||||
while (time in SCROBBLESDICT):
|
||||
time += 1
|
||||
@@ -192,35 +191,31 @@ def getArtistID(name):
|
||||
if obj_normalized in ARTISTS_NORMALIZED_SET:
|
||||
return ARTISTS_NORMALIZED.index(obj_normalized)
|
||||
|
||||
else:
|
||||
i = len(ARTISTS)
|
||||
ARTISTS.append(obj)
|
||||
ARTISTS_NORMALIZED_SET.add(obj_normalized)
|
||||
ARTISTS_NORMALIZED.append(obj_normalized)
|
||||
i = len(ARTISTS)
|
||||
ARTISTS.append(obj)
|
||||
ARTISTS_NORMALIZED_SET.add(obj_normalized)
|
||||
ARTISTS_NORMALIZED.append(obj_normalized)
|
||||
|
||||
# with a new artist added, we might also get new artists that they are credited as
|
||||
cr = coa.getCredited(name)
|
||||
getArtistID(cr)
|
||||
# with a new artist added, we might also get new artists that they are credited as
|
||||
cr = coa.getCredited(name)
|
||||
getArtistID(cr)
|
||||
|
||||
coa.updateIDs(ARTISTS)
|
||||
coa.updateIDs(ARTISTS)
|
||||
|
||||
return i
|
||||
return i
|
||||
|
||||
def getTrackID(artists,title):
|
||||
artistset = set()
|
||||
for a in artists:
|
||||
artistset.add(getArtistID(name=a))
|
||||
artistset = {getArtistID(name=a) for a in artists}
|
||||
obj = Track(artists=frozenset(artistset),title=title)
|
||||
obj_normalized = Track(artists=frozenset(artistset),title=normalize_name(title))
|
||||
|
||||
if obj_normalized in TRACKS_NORMALIZED_SET:
|
||||
return TRACKS_NORMALIZED.index(obj_normalized)
|
||||
else:
|
||||
i = len(TRACKS)
|
||||
TRACKS.append(obj)
|
||||
TRACKS_NORMALIZED_SET.add(obj_normalized)
|
||||
TRACKS_NORMALIZED.append(obj_normalized)
|
||||
return i
|
||||
i = len(TRACKS)
|
||||
TRACKS.append(obj)
|
||||
TRACKS_NORMALIZED_SET.add(obj_normalized)
|
||||
TRACKS_NORMALIZED.append(obj_normalized)
|
||||
return i
|
||||
|
||||
import unicodedata
|
||||
|
||||
@@ -330,11 +325,7 @@ def get_scrobbles_num(**keys):
|
||||
|
||||
def get_tracks(artist=None):
|
||||
|
||||
if artist is not None:
|
||||
artistid = ARTISTS.index(artist)
|
||||
else:
|
||||
artistid = None
|
||||
|
||||
artistid = ARTISTS.index(artist) if artist is not None else None
|
||||
# Option 1
|
||||
return [get_track_dict(t) for t in TRACKS if (artistid in t.artists) or (artistid==None)]
|
||||
|
||||
@@ -639,7 +630,7 @@ def check_issues():
|
||||
duplicates.append((a,ar))
|
||||
|
||||
st = st.replace("&","").replace("and","").replace("with","").strip()
|
||||
if st != "" and st != a:
|
||||
if st not in ["", a]:
|
||||
if len(st) < 5 and len(lis) == 1:
|
||||
#check if we havent just randomly found the string in another word
|
||||
#if (" " + st + " ") in lis[0] or (lis[0].endswith(" " + st)) or (lis[0].startswith(st + " ")):
|
||||
@@ -694,14 +685,9 @@ def get_predefined_rulesets():
|
||||
if f.endswith(".tsv"):
|
||||
|
||||
rawf = f.replace(".tsv","")
|
||||
valid = True
|
||||
for char in rawf:
|
||||
if char not in validchars:
|
||||
valid = False
|
||||
break # don't even show up invalid filenames
|
||||
|
||||
valid = all(char in validchars for char in rawf)
|
||||
if not valid: continue
|
||||
if not "_" in rawf: continue
|
||||
if "_" not in rawf: continue
|
||||
|
||||
try:
|
||||
with open(data_dir['rules']("predefined",f)) as tsvfile:
|
||||
@@ -711,21 +697,14 @@ def get_predefined_rulesets():
|
||||
if "# NAME: " in line1:
|
||||
name = line1.replace("# NAME: ","")
|
||||
else: name = rawf.split("_")[1]
|
||||
if "# DESC: " in line2:
|
||||
desc = line2.replace("# DESC: ","")
|
||||
else: desc = ""
|
||||
|
||||
desc = line2.replace("# DESC: ","") if "# DESC: " in line2 else ""
|
||||
author = rawf.split("_")[0]
|
||||
except:
|
||||
continue
|
||||
|
||||
ruleset = {"file":rawf}
|
||||
rulesets.append(ruleset)
|
||||
if os.path.exists(data_dir['rules'](f)):
|
||||
ruleset["active"] = True
|
||||
else:
|
||||
ruleset["active"] = False
|
||||
|
||||
ruleset["active"] = bool(os.path.exists(data_dir['rules'](f)))
|
||||
ruleset["name"] = name
|
||||
ruleset["author"] = author
|
||||
ruleset["desc"] = desc
|
||||
@@ -805,7 +784,7 @@ def build_db():
|
||||
STAMPS.sort()
|
||||
|
||||
# inform malojatime module about earliest scrobble
|
||||
if len(STAMPS) > 0: register_scrobbletime(STAMPS[0])
|
||||
if STAMPS: register_scrobbletime(STAMPS[0])
|
||||
|
||||
# NOT NEEDED BECAUSE WE DO THAT ON ADDING EVERY ARTIST ANYWAY
|
||||
# get extra artists with no real scrobbles from countas rules
|
||||
@@ -1155,20 +1134,13 @@ def db_aggregate_full(by=None,since=None,to=None,within=None,timerange=None,arti
|
||||
|
||||
# Search for strings
|
||||
def db_search(query,type=None):
|
||||
results = []
|
||||
if type=="ARTIST":
|
||||
results = []
|
||||
for a in ARTISTS:
|
||||
#if query.lower() in a.lower():
|
||||
if simplestr(query) in simplestr(a):
|
||||
results.append(a)
|
||||
|
||||
results = [a for a in ARTISTS if simplestr(query) in simplestr(a)]
|
||||
if type=="TRACK":
|
||||
results = []
|
||||
for t in TRACKS:
|
||||
#if query.lower() in t[1].lower():
|
||||
if simplestr(query) in simplestr(t[1]):
|
||||
results.append(get_track_dict(t))
|
||||
|
||||
results = [
|
||||
get_track_dict(t) for t in TRACKS if simplestr(query) in simplestr(t[1])
|
||||
]
|
||||
return results
|
||||
|
||||
|
||||
@@ -1227,7 +1199,7 @@ def scrobbles_in_range(start,end,reverse=False):
|
||||
# for performance testing
|
||||
def generateStuff(num=0,pertrack=0,mult=0):
|
||||
import random
|
||||
for i in range(num):
|
||||
for _ in range(num):
|
||||
track = random.choice(TRACKS)
|
||||
t = get_track_dict(track)
|
||||
time = random.randint(STAMPS[0],STAMPS[-1])
|
||||
@@ -1235,7 +1207,7 @@ def generateStuff(num=0,pertrack=0,mult=0):
|
||||
|
||||
for track in TRACKS:
|
||||
t = get_track_dict(track)
|
||||
for i in range(pertrack):
|
||||
for _ in range(pertrack):
|
||||
time = random.randint(STAMPS[0],STAMPS[-1])
|
||||
createScrobble(t["artists"],t["title"],time,volatile=True)
|
||||
|
||||
|
@@ -42,12 +42,11 @@ def find_representative(sequence,attribute_id,attribute_count):
|
||||
|
||||
|
||||
def combine_dicts(dictlist):
|
||||
res = {k:d[k] for d in dictlist for k in d}
|
||||
return res
|
||||
return {k:d[k] for d in dictlist for k in d}
|
||||
|
||||
|
||||
def compare_key_in_dicts(key,d1,d2):
|
||||
return d1[key] == d2[key]
|
||||
|
||||
def alltrue(seq):
|
||||
return all(s for s in seq)
|
||||
return all(seq)
|
||||
|
@@ -77,7 +77,7 @@ class MRangeDescriptor:
|
||||
class MTime(MRangeDescriptor):
|
||||
def __init__(self,*ls):
|
||||
# in case we want to call with non-unpacked arguments
|
||||
if isinstance(ls[0],tuple) or isinstance(ls[0],list):
|
||||
if isinstance(ls[0], (tuple, list)):
|
||||
ls = ls[0]
|
||||
|
||||
self.tup = tuple(ls)
|
||||
@@ -104,9 +104,7 @@ class MTime(MRangeDescriptor):
|
||||
if tod.year == self.year:
|
||||
if tod.month > self.month: return False
|
||||
if self.precision == 2: return True
|
||||
if tod.month == self.month:
|
||||
if tod.day > self.day: return False
|
||||
|
||||
if tod.month == self.month and tod.day > self.day: return False
|
||||
return True
|
||||
|
||||
|
||||
@@ -144,21 +142,21 @@ class MTime(MRangeDescriptor):
|
||||
|
||||
# describes only the parts that are different than another range object
|
||||
def contextual_desc(self,other):
|
||||
if isinstance(other,MTime):
|
||||
relevant = self.desc().split(" ")
|
||||
if self.year == other.year:
|
||||
if not isinstance(other, MTime):
|
||||
return self.desc()
|
||||
relevant = self.desc().split(" ")
|
||||
if self.year == other.year:
|
||||
relevant.pop()
|
||||
if self.precision > 1 and other.precision > 1 and self.month == other.month:
|
||||
relevant.pop()
|
||||
if self.precision > 1 and other.precision > 1 and self.month == other.month:
|
||||
if self.precision > 2 and other.precision > 2 and self.day == other.day:
|
||||
relevant.pop()
|
||||
if self.precision > 2 and other.precision > 2 and self.day == other.day:
|
||||
relevant.pop()
|
||||
return " ".join(relevant)
|
||||
return self.desc()
|
||||
return " ".join(relevant)
|
||||
|
||||
# gets object with one higher precision that starts this one
|
||||
def start(self):
|
||||
if self.precision == 1: return MTime(self.tup + (1,))
|
||||
elif self.precision == 2: return MTime(self.tup + (1,))
|
||||
if self.precision in [1, 2]: return MTime(self.tup + (1,))
|
||||
|
||||
# gets object with one higher precision that ends this one
|
||||
def end(self):
|
||||
if self.precision == 1: return MTime(self.tup + (12,))
|
||||
@@ -251,8 +249,7 @@ class MTimeWeek(MRangeDescriptor):
|
||||
return self.desc()
|
||||
|
||||
def contextual_desc(self,other):
|
||||
if isinstance(other,MTimeWeek):
|
||||
if other.year == self.year: return "Week " + str(self.week)
|
||||
if isinstance(other, MTimeWeek) and other.year == self.year: return "Week " + str(self.week)
|
||||
return self.desc()
|
||||
|
||||
def start(self):
|
||||
|
@@ -132,7 +132,7 @@ def compose_querystring(*dicts,exclude=[]):
|
||||
for k in keys:
|
||||
if k in exclude: continue
|
||||
values = keys.getall(k)
|
||||
st += "&".join([urllib.parse.urlencode({k:v},safe="/") for v in values])
|
||||
st += "&".join(urllib.parse.urlencode({k:v},safe="/") for v in values)
|
||||
st += "&"
|
||||
return st[:-1] if st.endswith("&") else st # remove last ampersand
|
||||
|
||||
|
@@ -11,16 +11,14 @@ from . import tasks
|
||||
def getInstance():
|
||||
try:
|
||||
output = subprocess.check_output(["pidof","Maloja"])
|
||||
pid = int(output)
|
||||
return pid
|
||||
return int(output)
|
||||
except:
|
||||
return None
|
||||
|
||||
def getInstanceSupervisor():
|
||||
try:
|
||||
output = subprocess.check_output(["pidof","maloja_supervisor"])
|
||||
pid = int(output)
|
||||
return pid
|
||||
return int(output)
|
||||
except:
|
||||
return None
|
||||
|
||||
@@ -60,12 +58,12 @@ def stop():
|
||||
if pid is not None:
|
||||
os.kill(pid,signal.SIGTERM)
|
||||
|
||||
if pid is not None or pid_sv is not None:
|
||||
print("Maloja stopped!")
|
||||
return True
|
||||
else:
|
||||
if pid is None and pid_sv is None:
|
||||
return False
|
||||
|
||||
print("Maloja stopped!")
|
||||
return True
|
||||
|
||||
|
||||
|
||||
def direct():
|
||||
|
@@ -26,10 +26,7 @@ def copy_initial_local_files():
|
||||
charset = list(range(10)) + list("abcdefghijklmnopqrstuvwxyz") + list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
def randomstring(length=32):
|
||||
import random
|
||||
key = ""
|
||||
for i in range(length):
|
||||
key += str(random.choice(charset))
|
||||
return key
|
||||
return "".join(str(random.choice(charset)) for _ in range(length))
|
||||
|
||||
def setup():
|
||||
|
||||
@@ -50,18 +47,13 @@ def setup():
|
||||
|
||||
|
||||
# OWN API KEY
|
||||
if os.path.exists(data_dir['clients']("authenticated_machines.tsv")):
|
||||
pass
|
||||
else:
|
||||
if not os.path.exists(data_dir['clients']("authenticated_machines.tsv")):
|
||||
answer = ask("Do you want to set up a key to enable scrobbling? Your scrobble extension needs that key so that only you can scrobble tracks to your database.",default=True,skip=SKIP)
|
||||
if answer:
|
||||
key = randomstring(64)
|
||||
print("Your API Key: " + col["yellow"](key))
|
||||
with open(data_dir['clients']("authenticated_machines.tsv"),"w") as keyfile:
|
||||
keyfile.write(key + "\t" + "Default Generated Key")
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
# PASSWORD
|
||||
defaultpassword = settings.get_settings("DEFAULT_PASSWORD")
|
||||
@@ -81,14 +73,11 @@ def setup():
|
||||
if newpw is None:
|
||||
newpw = defaultpassword
|
||||
print("Generated password:",newpw)
|
||||
auth.defaultuser.setpw(newpw)
|
||||
else:
|
||||
# docker installation (or settings file, but don't do that)
|
||||
# we still 'ask' the user to set one, but for docker this will be skipped
|
||||
newpw = prompt("Please set a password for web backend access. Leave this empty to use the default password.",skip=SKIP,default=defaultpassword,secret=True)
|
||||
auth.defaultuser.setpw(newpw)
|
||||
|
||||
|
||||
auth.defaultuser.setpw(newpw)
|
||||
if settings.get_settings("NAME") is None:
|
||||
name = prompt("Please enter your name. This will be displayed e.g. when comparing your charts to another user. Leave this empty if you would not like to specify a name right now.",default="Generic Maloja User",skip=SKIP)
|
||||
settings.update_settings(data_dir['settings']("settings.ini"),{"NAME":name},create_new=True)
|
||||
|
@@ -21,8 +21,11 @@ def update():
|
||||
|
||||
def start():
|
||||
try:
|
||||
p = subprocess.Popen(["python3","-m","maloja.server"],stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL)
|
||||
return p
|
||||
return subprocess.Popen(
|
||||
["python3", "-m", "maloja.server"],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
except e:
|
||||
log("Error starting Maloja: " + str(e),module="supervisor")
|
||||
|
||||
|
@@ -39,8 +39,8 @@ def backup(folder,level="full"):
|
||||
archivefile = os.path.join(folder,filename)
|
||||
assert not os.path.exists(archivefile)
|
||||
with tarfile.open(name=archivefile,mode="x:gz") as archive:
|
||||
for cat in real_files:
|
||||
for f in real_files[cat]:
|
||||
for cat, value in real_files.items():
|
||||
for f in value:
|
||||
p = PurePath(f)
|
||||
r = p.relative_to(data_dir[cat]())
|
||||
archive.add(f,arcname=os.path.join(cat,r))
|
||||
|
@@ -94,8 +94,7 @@ def clean_html(inp):
|
||||
@webserver.route("")
|
||||
@webserver.route("/")
|
||||
def mainpage():
|
||||
response = static_html("start")
|
||||
return response
|
||||
return static_html("start")
|
||||
|
||||
@webserver.error(400)
|
||||
@webserver.error(403)
|
||||
@@ -112,8 +111,12 @@ def customerror(error):
|
||||
adminmode = request.cookies.get("adminmode") == "true" and auth.check(request)
|
||||
|
||||
template = jinja_environment.get_template('error.jinja')
|
||||
res = template.render(errorcode=errorcode,errordesc=errordesc,traceback=traceback,adminmode=adminmode)
|
||||
return res
|
||||
return template.render(
|
||||
errorcode=errorcode,
|
||||
errordesc=errordesc,
|
||||
traceback=traceback,
|
||||
adminmode=adminmode,
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
13
maloja/thirdparty/__init__.py
vendored
13
maloja/thirdparty/__init__.py
vendored
@@ -177,17 +177,14 @@ class MetadataInterface(GenericInterface,abstract=True):
|
||||
# default function to parse response by descending down nodes
|
||||
# override if more complicated
|
||||
def metadata_parse_response_artist(self,data):
|
||||
res = data
|
||||
for node in self.metadata["response_parse_tree_artist"]:
|
||||
try:
|
||||
res = res[node]
|
||||
except:
|
||||
return None
|
||||
return res
|
||||
return self._parse_response("response_parse_tree_artist", data)
|
||||
|
||||
def metadata_parse_response_track(self,data):
|
||||
return self._parse_response("response_parse_tree_track", data)
|
||||
|
||||
def _parse_response(self, resp, data):
|
||||
res = data
|
||||
for node in self.metadata["response_parse_tree_track"]:
|
||||
for node in self.metadata[resp]:
|
||||
try:
|
||||
res = res[node]
|
||||
except:
|
||||
|
3
maloja/thirdparty/musicbrainz.py
vendored
3
maloja/thirdparty/musicbrainz.py
vendored
@@ -1,5 +1,4 @@
|
||||
from . import MetadataInterface, utf, b64
|
||||
import hashlib
|
||||
from . import MetadataInterface
|
||||
import urllib.parse, urllib.request
|
||||
import json
|
||||
import time
|
||||
|
1
maloja/thirdparty/spotify.py
vendored
1
maloja/thirdparty/spotify.py
vendored
@@ -1,5 +1,4 @@
|
||||
from . import MetadataInterface, utf, b64
|
||||
import hashlib
|
||||
import urllib.parse, urllib.request
|
||||
import json
|
||||
from threading import Timer
|
||||
|
@@ -159,11 +159,10 @@ def getTrackImage(artists,title,fast=False):
|
||||
#result = cachedTracks[(frozenset(artists),title)]
|
||||
result = track_cache.get((frozenset(artists),title)) #track_from_cache(artists,title)
|
||||
if result is not None: return result
|
||||
else:
|
||||
for a in artists:
|
||||
res = getArtistImage(artist=a,fast=True)
|
||||
if res != "": return res
|
||||
return ""
|
||||
for a in artists:
|
||||
res = getArtistImage(artist=a,fast=True)
|
||||
if res != "": return res
|
||||
return ""
|
||||
except:
|
||||
pass
|
||||
|
||||
@@ -173,7 +172,9 @@ def getTrackImage(artists,title,fast=False):
|
||||
|
||||
|
||||
# fast request only retuns cached and local results, generates redirect link for rest
|
||||
if fast: return "/image?title=" + urllib.parse.quote(title) + "&" + "&".join(["artist=" + urllib.parse.quote(a) for a in artists])
|
||||
if fast:
|
||||
return ("/image?title=" + urllib.parse.quote(title) + "&" + "&".join(
|
||||
"artist=" + urllib.parse.quote(a) for a in artists))
|
||||
|
||||
# non-fast lookup (essentially only the resolver lookup)
|
||||
result = thirdparty.get_image_track_all((artists,title))
|
||||
@@ -184,11 +185,10 @@ def getTrackImage(artists,title,fast=False):
|
||||
|
||||
# return either result or redirect to artist
|
||||
if result is not None: return result
|
||||
else:
|
||||
for a in artists:
|
||||
res = getArtistImage(artist=a,fast=False)
|
||||
if res != "": return res
|
||||
return ""
|
||||
for a in artists:
|
||||
res = getArtistImage(artist=a,fast=False)
|
||||
if res != "": return res
|
||||
return ""
|
||||
|
||||
|
||||
def getArtistImage(artist,fast=False):
|
||||
|
@@ -12,7 +12,7 @@ def serialize(obj):
|
||||
try:
|
||||
return json.dumps(obj)
|
||||
except:
|
||||
if isinstance(obj,list) or isinstance(obj,tuple):
|
||||
if isinstance(obj, (list, tuple)):
|
||||
return "[" + ",".join(serialize(o) for o in obj) + "]"
|
||||
elif isinstance(obj,dict):
|
||||
return "{" + ",".join(serialize(o) + ":" + serialize(obj[o]) for o in obj) + "}"
|
||||
|
Reference in New Issue
Block a user