mirror of
https://github.com/krateng/maloja.git
synced 2023-08-10 21:12:55 +03:00
183 lines
6.4 KiB
Python
183 lines
6.4 KiB
Python
import urllib.parse, urllib.request
|
|
import json
|
|
import base64
|
|
from doreah.settings import get_settings
|
|
from doreah.logging import log
|
|
import hashlib
|
|
import xml.etree.ElementTree as ET
|
|
|
|
### PICTURES
|
|
|
|
|
|
apis_artists = []
|
|
|
|
if get_settings("LASTFM_API_KEY") not in [None,"ASK"] and get_settings("FANARTTV_API_KEY") not in [None,"ASK"]:
|
|
apis_artists.append({
|
|
"name":"LastFM + Fanart.tv",
|
|
#"check":get_settings("LASTFM_API_KEY") not in [None,"ASK"] and get_settings("FANARTTV_API_KEY") not in [None,"ASK"],
|
|
"steps":[
|
|
("get","http://ws.audioscrobbler.com/2.0/?method=artist.getinfo&artist={artiststring}&api_key=" + str(get_settings("LASTFM_API_KEY")) + "&format=json"),
|
|
("parse",["artist","mbid"]),
|
|
("get","http://webservice.fanart.tv/v3/music/{var}?api_key=" + str(get_settings("FANARTTV_API_KEY"))),
|
|
("parse",["artistthumb",0,"url"])
|
|
]
|
|
})
|
|
|
|
if get_settings("SPOTIFY_API_ID") not in [None,"ASK"] and get_settings("SPOTIFY_API_SECRET") not in [None,"ASK"]:
|
|
apis_artists.append({
|
|
"name":"Spotify",
|
|
#"check":get_settings("SPOTIFY_API_ID") not in [None,"ASK"] and get_settings("SPOTIFY_API_SECRET") not in [None,"ASK"],
|
|
"steps":[
|
|
("post","https://accounts.spotify.com/api/token",{"Authorization":"Basic " + base64.b64encode(bytes(get_settings("SPOTIFY_API_ID") + ":" + get_settings("SPOTIFY_API_SECRET"),encoding="utf-8")).decode("utf-8")},{"grant_type":"client_credentials"}),
|
|
("parse",["access_token"]),
|
|
("get","https://api.spotify.com/v1/search?q={artiststring}&type=artist&access_token={var}"),
|
|
("parse",["artists","items",0,"images",0,"url"])
|
|
]
|
|
})
|
|
|
|
apis_tracks = []
|
|
|
|
if get_settings("LASTFM_API_KEY") not in [None,"ASK"]:
|
|
apis_tracks.append({
|
|
"name":"LastFM",
|
|
#"check":get_settings("LASTFM_API_KEY") not in [None,"ASK"],
|
|
"steps":[
|
|
("get","https://ws.audioscrobbler.com/2.0/?method=track.getinfo&track={titlestring}&artist={artiststring}&api_key=" + get_settings("LASTFM_API_KEY") + "&format=json"),
|
|
("parse",["track","album","image",3,"#text"])
|
|
]
|
|
})
|
|
|
|
if get_settings("SPOTIFY_API_ID") not in [None,"ASK"] and get_settings("SPOTIFY_API_SECRET") not in [None,"ASK"]:
|
|
apis_tracks.append({
|
|
"name":"Spotify",
|
|
#"check":get_settings("SPOTIFY_API_ID") not in [None,"ASK"] and get_settings("SPOTIFY_API_SECRET") not in [None,"ASK"],
|
|
"steps":[
|
|
("post","https://accounts.spotify.com/api/token",{"Authorization":"Basic " + base64.b64encode(bytes(get_settings("SPOTIFY_API_ID") + ":" + get_settings("SPOTIFY_API_SECRET"),encoding="utf-8")).decode("utf-8")},{"grant_type":"client_credentials"}),
|
|
("parse",["access_token"]),
|
|
("get","https://api.spotify.com/v1/search?q={artiststring}%20{titlestring}&type=track&access_token={var}"),
|
|
("parse",["tracks","items",0,"album","images",0,"url"])
|
|
]
|
|
})
|
|
|
|
|
|
def api_request_artist(artist):
|
|
for api in apis_artists:
|
|
if True:
|
|
log("API: " + api["name"] + "; Image request: " + artist,module="external")
|
|
try:
|
|
artiststring = urllib.parse.quote(artist)
|
|
var = artiststring
|
|
for step in api["steps"]:
|
|
if step[0] == "get":
|
|
response = urllib.request.urlopen(step[1].format(artiststring=artiststring,var=var))
|
|
var = json.loads(response.read())
|
|
elif step[0] == "post":
|
|
keys = {
|
|
"url":step[1].format(artiststring=artiststring,var=var),
|
|
"method":"POST",
|
|
"headers":step[2],
|
|
"data":bytes(urllib.parse.urlencode(step[3]),encoding="utf-8")
|
|
}
|
|
req = urllib.request.Request(**keys)
|
|
response = urllib.request.urlopen(req)
|
|
var = json.loads(response.read())
|
|
elif step[0] == "parse":
|
|
for node in step[1]:
|
|
var = var[node]
|
|
assert isinstance(var,str) and var != ""
|
|
except:
|
|
continue
|
|
|
|
return var
|
|
else:
|
|
pass
|
|
return None
|
|
|
|
def api_request_track(track):
|
|
artists, title = track
|
|
for api in apis_tracks:
|
|
if True:
|
|
log("API: " + api["name"] + "; Image request: " + "/".join(artists) + " - " + title,module="external")
|
|
try:
|
|
artiststring = urllib.parse.quote(", ".join(artists))
|
|
titlestring = urllib.parse.quote(title)
|
|
var = artiststring + titlestring
|
|
for step in api["steps"]:
|
|
if step[0] == "get":
|
|
response = urllib.request.urlopen(step[1].format(artiststring=artiststring,titlestring=titlestring,var=var))
|
|
var = json.loads(response.read())
|
|
elif step[0] == "post":
|
|
keys = {
|
|
"url":step[1].format(artiststring=artiststring,titlestring=titlestring,var=var),
|
|
"method":"POST",
|
|
"headers":step[2],
|
|
"data":bytes(urllib.parse.urlencode(step[3]),encoding="utf-8")
|
|
}
|
|
req = urllib.request.Request(**keys)
|
|
response = urllib.request.urlopen(req)
|
|
var = json.loads(response.read())
|
|
elif step[0] == "parse":
|
|
for node in step[1]:
|
|
var = var[node]
|
|
assert isinstance(var,str) and var != ""
|
|
except:
|
|
if len(artists) != 1:
|
|
# try the same track with every single artist
|
|
for a in artists:
|
|
result = api_request_track(([a],title))
|
|
if result is not None:
|
|
return result
|
|
continue
|
|
|
|
return var
|
|
else:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### SCROBBLING
|
|
|
|
# creates signature and returns full query string
|
|
def lfmbuild(parameters):
|
|
m = hashlib.md5()
|
|
keys = sorted(str(k) for k in parameters)
|
|
m.update(utf("".join(str(k) + str(parameters[k]) for k in keys)))
|
|
m.update(utf(get_settings("LASTFM_API_SECRET")))
|
|
sig = m.hexdigest()
|
|
return urllib.parse.urlencode(parameters) + "&api_sig=" + sig
|
|
|
|
def utf(st):
|
|
return st.encode(encoding="UTF-8")
|
|
|
|
|
|
|
|
apis_scrobble = []
|
|
|
|
if get_settings("LASTFM_API_SK") not in [None,"ASK"] and get_settings("LASTFM_API_SECRET") not in [None,"ASK"] and get_settings("LASTFM_API_KEY") not in [None,"ASK"]:
|
|
apis_scrobble.append({
|
|
"name":"LastFM",
|
|
"scrobbleurl":"http://ws.audioscrobbler.com/2.0/",
|
|
"requestbody":lambda artists,title,timestamp: lfmbuild({"method":"track.scrobble","artist[0]":", ".join(artists),"track[0]":title,"timestamp":timestamp,"api_key":get_settings("LASTFM_API_KEY"),"sk":get_settings("LASTFM_API_SK")})
|
|
})
|
|
|
|
|
|
|
|
|
|
def proxy_scrobble(artists,title,timestamp):
|
|
for api in apis_scrobble:
|
|
response = urllib.request.urlopen(api["scrobbleurl"],data=utf(api["requestbody"](artists,title,timestamp)))
|
|
xml = response.read()
|
|
data = ET.fromstring(xml)
|
|
if data.attrib.get("status") == "ok":
|
|
if data.find("scrobbles").attrib.get("ignored") == "0":
|
|
log(api["name"] + ": Scrobble accepted: " + "/".join(artists) + " - " + title)
|
|
else:
|
|
log(api["name"] + ": Scrobble not accepted: " + "/".join(artists) + " - " + title)
|