
523 lines
14 KiB

# server
from bottle import request, response, FormsDict, HTTPError
# rest of the project
from ..cleanup import CleanerAgent, CollectorAgent
from .. import images
from ..malojatime import register_scrobbletime, time_stamps, ranges, alltime
from ..malojauri import uri_to_internal, internal_to_uri, compose_querystring
from ..thirdparty import proxy_scrobble_all
from ..globalconf import data_dir, malojaconfig
from ..apis import apikeystore
from . import sqldb
from . import cached
from . import dbcache
# doreah toolkit
from doreah.logging import log
from doreah import tsv
from doreah.auth import authenticated_api, authenticated_api_with_alternate
import doreah
# technical
import os
import datetime
import sys
import unicodedata
from collections import namedtuple
from threading import Lock
import yaml, json
import math
# url handling
import urllib
dbstatus = {
"healthy":False, # we can access the db
"complete":False # information is complete
class DatabaseNotBuilt(HTTPError):
def __init__(self):
body="The Maloja Database is being upgraded to Version 3. This could take several minutes.",
def waitfordb(func):
def newfunc(*args,**kwargs):
if not dbstatus['healthy']: raise DatabaseNotBuilt()
return func(*args,**kwargs)
return newfunc
cla = CleanerAgent()
coa = CollectorAgent()
## this function accepts a flat dict - all info of the scrobble should be top level key
## but can contain a list as value
## the following keys are valid:
## scrobble_duration int
## scrobble_time int
## track_title str, mandatory
## track_artists list, mandatory
## track_length int
## album_name str
## album_artists list
def incoming_scrobble(rawscrobble,fix=True,client=None,dbconn=None,**kwargs):
# TODO: just collecting all extra kwargs now. at some point, rework the authenticated api with alt function
# to actually look at the converted args instead of the request object and remove the key
# so that this function right here doesnt get the key passed to it
if (not "track_artists" in rawscrobble) or (len(rawscrobble['track_artists']) == 0) or (not "track_title" in rawscrobble):
log(f"Incoming scrobble {rawscrobble} [Source: {client}] is not valid")
return {"status":"failure"}
log(f"Incoming scrobble [{client}]: {rawscrobble}")
# raw scrobble to processed info
scrobbleinfo = {**rawscrobble}
if fix:
scrobbleinfo['track_artists'],scrobbleinfo['track_title'] = cla.fullclean(scrobbleinfo['track_artists'],scrobbleinfo['track_title'])
scrobbleinfo['scrobble_time'] = scrobbleinfo.get('scrobble_time') or int(
# processed info to internal scrobble dict
scrobbledict = {
"origin":f"client: {client}" if client else "generic",
k:scrobbleinfo[k] for k in scrobbleinfo if k not in
return {"status":"success","scrobble":scrobbledict}
def get_scrobbles(dbconn=None,**keys):
(since,to) = keys.get('timerange').timestamps()
if 'artist' in keys:
result = sqldb.get_scrobbles_of_artist(artist=keys['artist'],since=since,to=to,dbconn=dbconn)
elif 'track' in keys:
result = sqldb.get_scrobbles_of_track(track=keys['track'],since=since,to=to,dbconn=dbconn)
result = sqldb.get_scrobbles(since=since,to=to,dbconn=dbconn)
#return result[keys['page']*keys['perpage']:(keys['page']+1)*keys['perpage']]
return list(reversed(result))
def get_scrobbles_num(dbconn=None,**keys):
(since,to) = keys.get('timerange').timestamps()
if 'artist' in keys:
result = len(sqldb.get_scrobbles_of_artist(artist=keys['artist'],since=since,to=to,resolve_references=False,dbconn=dbconn))
elif 'track' in keys:
result = len(sqldb.get_scrobbles_of_track(track=keys['track'],since=since,to=to,resolve_references=False,dbconn=dbconn))
result = sqldb.get_scrobbles_num(since=since,to=to,dbconn=dbconn)
return result
def get_tracks(dbconn=None,**keys):
if keys.get('artist') is None:
result = sqldb.get_tracks(dbconn=dbconn)
result = sqldb.get_tracks_of_artist(keys.get('artist'),dbconn=dbconn)
return result
def get_artists(dbconn=None):
return sqldb.get_artists(dbconn=dbconn)
def get_charts_artists(dbconn=None,**keys):
(since,to) = keys.get('timerange').timestamps()
result = sqldb.count_scrobbles_by_artist(since=since,to=to,dbconn=dbconn)
return result
def get_charts_tracks(dbconn=None,**keys):
(since,to) = keys.get('timerange').timestamps()
if 'artist' in keys:
result = sqldb.count_scrobbles_by_track_of_artist(since=since,to=to,artist=keys['artist'],dbconn=dbconn)
result = sqldb.count_scrobbles_by_track(since=since,to=to,dbconn=dbconn)
return result
def get_pulse(dbconn=None,**keys):
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","timerange","step","stepn","trail"]})
results = []
for rng in rngs:
res = get_scrobbles_num(timerange=rng,**{k:keys[k] for k in keys if k != 'timerange'},dbconn=dbconn)
return results
def get_performance(dbconn=None,**keys):
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","timerange","step","stepn","trail"]})
results = []
for rng in rngs:
if "track" in keys:
track = sqldb.get_track(sqldb.get_track_id(keys['track'],dbconn=dbconn),dbconn=dbconn)
charts = get_charts_tracks(timerange=rng,dbconn=dbconn)
rank = None
for c in charts:
if c["track"] == track:
rank = c["rank"]
elif "artist" in keys:
artist = sqldb.get_artist(sqldb.get_artist_id(keys['artist'],dbconn=dbconn),dbconn=dbconn)
# ^this is the most useless line in programming history
# but I like consistency
charts = get_charts_artists(timerange=rng,dbconn=dbconn)
rank = None
for c in charts:
if c["artist"] == artist:
rank = c["rank"]
return results
def get_top_artists(dbconn=None,**keys):
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","timerange","step","stepn","trail"]})
results = []
for rng in rngs:
res = get_charts_artists(timerange=rng,dbconn=dbconn)[0]
return results
def get_top_tracks(dbconn=None,**keys):
rngs = ranges(**{k:keys[k] for k in keys if k in ["since","to","within","timerange","step","stepn","trail"]})
results = []
for rng in rngs:
res = get_charts_tracks(timerange=rng,dbconn=dbconn)[0]
return results
def artist_info(dbconn=None,**keys):
artist = keys.get('artist')
artist = sqldb.get_artist(sqldb.get_artist_id(artist,dbconn=dbconn),dbconn=dbconn)
alltimecharts = get_charts_artists(timerange=alltime(),dbconn=dbconn)
scrobbles = get_scrobbles_num(artist=artist,timerange=alltime(),dbconn=dbconn)
#we cant take the scrobble number from the charts because that includes all countas scrobbles
c = [e for e in alltimecharts if e["artist"] == artist][0]
others = sqldb.get_associated_artists(artist,dbconn=dbconn)
position = c["rank"]
return {
"gold": [year for year in cached.medals_artists if artist in cached.medals_artists[year]['gold']],
"silver": [year for year in cached.medals_artists if artist in cached.medals_artists[year]['silver']],
"bronze": [year for year in cached.medals_artists if artist in cached.medals_artists[year]['bronze']],
"topweeks":len([e for e in cached.weekly_topartists if e == artist])
# if the artist isnt in the charts, they are not being credited and we
# need to show information about the credited one
replaceartist = sqldb.get_credited_artists(artist)[0]
c = [e for e in alltimecharts if e["artist"] == replaceartist][0]
position = c["rank"]
return {"artist":artist,"replace":replaceartist,"scrobbles":scrobbles,"position":position}
def track_info(dbconn=None,**keys):
track = keys.get('track')
track = sqldb.get_track(sqldb.get_track_id(track,dbconn=dbconn),dbconn=dbconn)
alltimecharts = get_charts_tracks(timerange=alltime(),dbconn=dbconn)
#scrobbles = get_scrobbles_num(track=track,timerange=alltime())
c = [e for e in alltimecharts if e["track"] == track][0]
scrobbles = c["scrobbles"]
position = c["rank"]
cert = None
threshold_gold, threshold_platinum, threshold_diamond = malojaconfig["SCROBBLES_GOLD","SCROBBLES_PLATINUM","SCROBBLES_DIAMOND"]
if scrobbles >= threshold_diamond: cert = "diamond"
elif scrobbles >= threshold_platinum: cert = "platinum"
elif scrobbles >= threshold_gold: cert = "gold"
return {
"gold": [year for year in cached.medals_tracks if track in cached.medals_tracks[year]['gold']],
"silver": [year for year in cached.medals_tracks if track in cached.medals_tracks[year]['silver']],
"bronze": [year for year in cached.medals_tracks if track in cached.medals_tracks[year]['bronze']],
"topweeks":len([e for e in cached.weekly_toptracks if e == track])
def issues(dbconn=None):
return ISSUES
def check_issues():
combined = []
duplicates = []
newartists = []
import itertools
import difflib
sortedartists = ARTISTS.copy()
reversesortedartists = sortedartists.copy()
for a in reversesortedartists:
nochange = cla.confirmedReal(a)
st = a
lis = []
reachedmyself = False
for ar in sortedartists:
if (ar != a) and not reachedmyself:
elif not reachedmyself:
reachedmyself = True
if (ar.lower() == a.lower()) or ("the " + ar.lower() == a.lower()) or ("a " + ar.lower() == a.lower()):
if (ar + " " in st) or (" " + ar in st):
st = st.replace(ar,"").strip()
elif (ar == st):
st = ""
if not nochange:
elif (ar in st) and len(ar)*2 > len(st):
st = st.replace("&","").replace("and","").replace("with","").strip()
if st not in ["", a]:
if len(st) < 5 and len(lis) == 1:
#check if we havent just randomly found the string in another word
#if (" " + st + " ") in lis[0] or (lis[0].endswith(" " + st)) or (lis[0].startswith(st + " ")):
elif len(st) < 5 and len(lis) > 1 and not nochange:
elif len(st) >= 5 and not nochange:
#check if we havent just randomly found the string in another word
if (" " + st + " ") in a or (a.endswith(" " + st)) or (a.startswith(st + " ")):
#for c in itertools.combinations(ARTISTS,3):
# l = list(c)
# print(l)
# l.sort(key=len,reverse=True)
# [full,a1,a2] = l
# if (a1 + " " + a2 in full) or (a2 + " " + a1 in full):
# combined.append((full,a1,a2))
#for c in itertools.combinations(ARTISTS,2):
# if
# if (c[0].lower == c[1].lower):
# duplicates.append((c[0],c[1]))
# elif (c[0] + " " in c[1]) or (" " + c[0] in c[1]) or (c[1] + " " in c[0]) or (" " + c[1] in c[0]):
# if (c[0] in c[1]):
# full, part = c[1],c[0]
# rest = c[1].replace(c[0],"").strip()
# else:
# full, part = c[0],c[1]
# rest = c[0].replace(c[1],"").strip()
# if rest in ARTISTS and full not in [c[0] for c in combined]:
# combined.append((full,part,rest))
# elif (c[0] in c[1]) or (c[1] in c[0]):
# duplicates.append((c[0],c[1]))
return {"duplicates":duplicates,"combined":combined,"newartists":newartists}
def get_predefined_rulesets(dbconn=None):
validchars = "-_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
rulesets = []
for f in os.listdir(data_dir['rules']("predefined")):
if f.endswith(".tsv"):
rawf = f.replace(".tsv","")
valid = all(char in validchars for char in rawf)
if not valid: continue
if "_" not in rawf: continue
with open(data_dir['rules']("predefined",f)) as tsvfile:
line1 = tsvfile.readline()
line2 = tsvfile.readline()
if "# NAME: " in line1:
name = line1.replace("# NAME: ","")
else: name = rawf.split("_")[1]
desc = line2.replace("# DESC: ","") if "# DESC: " in line2 else ""
author = rawf.split("_")[0]
ruleset = {"file":rawf}
ruleset["active"] = bool(os.path.exists(data_dir['rules'](f)))
ruleset["name"] = name
ruleset["author"] = author
ruleset["desc"] = desc
return rulesets
## Server operation
def start_db():
# Upgrade database
from .. import upgrade
# Load temporary tables
from . import associated
dbstatus['healthy'] = True
# inform time module about begin of scrobbling
firstscrobble = sqldb.get_scrobbles()[0]
except IndexError:
# create cached information
dbstatus['complete'] = True
# Search for strings
def db_search(query,type=None):
results = []
if type=="ARTIST":
results = [a for a in sqldb.get_artists() if sqldb.normalize_name(query) in sqldb.normalize_name(a)]
if type=="TRACK":
results = [t for t in sqldb.get_tracks() if sqldb.normalize_name(query) in sqldb.normalize_name(t['title'])]
return results