
787 lines
20 KiB

import os
import math
import traceback
from bottle import response, static_file, request, FormsDict
from doreah.logging import log
from doreah.auth import authenticated_api, authenticated_api_with_alternate, authenticated_function
# nimrodel API
from nimrodel import EAPI as API
from nimrodel import Multi
from .. import database
from ..pkg_global.conf import malojaconfig, data_dir
from ..__pkginfo__ import VERSION
from ..malojauri import uri_to_internal, compose_querystring, internal_to_uri
from .. import images
from ._apikeys import apikeystore, api_key_correct
api = API(delay=True)
api.__apipath__ = "mlj_1"
errors = {
database.exceptions.MissingScrobbleParameters: lambda e: (400,{
'desc':"The scrobble is missing needed parameters."
database.exceptions.MissingEntityParameter: lambda e: (400,{
'desc':"This API call is not valid without an entity (track or artist)."
database.exceptions.EntityExists: lambda e: (409,{
'desc':"This entity already exists in the database. Consider merging instead."
database.exceptions.DatabaseNotBuilt: lambda e: (503,{
'desc':"The database is being upgraded. Please try again later."
images.MalformedB64: lambda e: (400,{
'desc':"The provided base 64 string is not valid."
# for http errors, use their status code
Exception: lambda e: ((e.status_code if hasattr(e,'statuscode') else 500),{
'desc':"The server has encountered an exception."
def catch_exceptions(func):
def protector(*args,**kwargs):
return func(*args,**kwargs)
except Exception as e:
for etype in errors:
if isinstance(e,etype):
errorhandling = errors[etype](e)
response.status = errorhandling[0]
return errorhandling[1]
protector.__doc__ = func.__doc__
protector.__annotations__ = func.__annotations__
return protector
def add_common_args_to_docstring(filterkeys=False,limitkeys=False,delimitkeys=False,amountkeys=False):
def decorator(func):
timeformats = "Possible formats include '2022', '2022/08', '2022/08/01', '2022/W42', 'today', 'thismonth', 'monday', 'august'"
if filterkeys:
func.__doc__ += f"""
:param string title: Track title
:param string artist: Track artist. Can be specified multiple times.
:param bool associated: Whether to include associated artists.
if limitkeys:
func.__doc__ += f"""
:param string from: Start of the desired time range. Can also be called since or start. {timeformats}
:param string until: End of the desired range. Can also be called to or end. {timeformats}
:param string in: Desired range. Can also be called within or during. {timeformats}
if delimitkeys:
func.__doc__ += """
:param string step: Step, e.g. month or week.
:param int stepn: Number of base type units per step
:param int trail: How many preceding steps should be factored in.
:param bool cumulative: Instead of a fixed trail length, use all history up to this point.
if amountkeys:
func.__doc__ += """
:param int page: Page to show
:param int perpage: Entries per page.
:param int max: Legacy. Show first page with this many entries.
return func
return decorator
def test_server(key=None):
"""Pings the server. If an API key is supplied, the server will respond with 200
if the key is correct and 403 if it isn't. If no key is supplied, the server will
always respond with 200.
:param string key: An API key to be tested. Optional.
:return: status (String), error (String)
:rtype: Dictionary
if key is not None and not apikeystore.check_key(key):
response.status = 403
return {
"error":"Wrong API key"
response.status = 200
return {
def server_info():
"""Returns basic information about the server.
:return: name (String), version (Tuple), versionstring (String), db_status (Mapping). Additional keys can be added at any point, but will not be removed within API version.
:rtype: Dictionary
return {
def get_scrobbles_external(**keys):
"""Returns a list of scrobbles.
:return: list (List)
:rtype: Dictionary
k_filter, k_time, _, k_amount, _ = uri_to_internal(keys,api=True)
ckeys = {**k_filter, **k_time, **k_amount}
result = database.get_scrobbles(**ckeys)
offset = (k_amount.get('page') * k_amount.get('perpage')) if k_amount.get('perpage') is not math.inf else 0
result = result[offset:]
if k_amount.get('perpage') is not math.inf: result = result[:k_amount.get('perpage')]
return {
def get_scrobbles_num_external(**keys):
"""Returns amount of scrobbles.
:return: amount (Integer)
:rtype: Dictionary
k_filter, k_time, _, k_amount, _ = uri_to_internal(keys)
ckeys = {**k_filter, **k_time, **k_amount}
result = database.get_scrobbles_num(**ckeys)
return {
def get_tracks_external(**keys):
"""Returns all tracks (optionally of an artist).
:return: list (List)
:rtype: Dictionary
k_filter, _, _, _, _ = uri_to_internal(keys,forceArtist=True)
ckeys = {**k_filter}
result = database.get_tracks(**ckeys)
return {
def get_artists_external():
"""Returns all artists.
:return: list (List)
:rtype: Dictionary"""
result = database.get_artists()
return {
def get_charts_artists_external(**keys):
"""Returns artist charts
:return: list (List)
:rtype: Dictionary"""
_, k_time, _, _, _ = uri_to_internal(keys)
ckeys = {**k_time}
result = database.get_charts_artists(**ckeys)
return {
def get_charts_tracks_external(**keys):
"""Returns track charts
:return: list (List)
:rtype: Dictionary"""
k_filter, k_time, _, _, _ = uri_to_internal(keys,forceArtist=True)
ckeys = {**k_filter, **k_time}
result = database.get_charts_tracks(**ckeys)
return {
def get_pulse_external(**keys):
"""Returns amounts of scrobbles in specified time frames
:return: list (List)
:rtype: Dictionary"""
k_filter, k_time, k_internal, k_amount, _ = uri_to_internal(keys)
ckeys = {**k_filter, **k_time, **k_internal, **k_amount}
results = database.get_pulse(**ckeys)
return {
def get_performance_external(**keys):
"""Returns artist's or track's rank in specified time frames
:return: list (List)
:rtype: Dictionary"""
k_filter, k_time, k_internal, k_amount, _ = uri_to_internal(keys)
ckeys = {**k_filter, **k_time, **k_internal, **k_amount}
results = database.get_performance(**ckeys)
return {
def get_top_artists_external(**keys):
"""Returns respective number 1 artists in specified time frames
:return: list (List)
:rtype: Dictionary"""
_, k_time, k_internal, _, _ = uri_to_internal(keys)
ckeys = {**k_time, **k_internal}
results = database.get_top_artists(**ckeys)
return {
def get_top_tracks_external(**keys):
"""Returns respective number 1 tracks in specified time frames
:return: list (List)
:rtype: Dictionary"""
_, k_time, k_internal, _, _ = uri_to_internal(keys)
ckeys = {**k_time, **k_internal}
results = database.get_top_tracks(**ckeys)
return {
def artist_info_external(**keys):
"""Returns information about an artist
:return: artist (String), scrobbles (Integer), position (Integer), associated (List), medals (Mapping), topweeks (Integer)
:rtype: Dictionary"""
k_filter, _, _, _, _ = uri_to_internal(keys,forceArtist=True)
ckeys = {**k_filter}
return database.artist_info(**ckeys)
def track_info_external(artist:Multi[str]=[],**keys):
"""Returns information about a track
:return: track (Mapping), scrobbles (Integer), position (Integer), medals (Mapping), certification (String), topweeks (Integer)
:rtype: Dictionary"""
# transform into a multidict so we can use our nomral uri_to_internal function
keys = FormsDict(keys)
for a in artist:
k_filter, _, _, _, _ = uri_to_internal(keys,forceTrack=True)
ckeys = {**k_filter}
return database.track_info(**ckeys)"newscrobble")
def post_scrobble(
"""Submit a new scrobble.
:param string artist: Artist. Can be submitted multiple times as query argument for multiple artists.
:param list artists: List of artists.
:param string title: Title of the track.
:param string album: Name of the album. Optional.
:param list albumartists: Album artists. Optional.
:param int duration: Actual listened duration of the scrobble in seconds. Optional.
:param int length: Total length of the track in seconds. Optional.
:param int time: UNIX timestamp of the scrobble. Optional, not needed if scrobble is at time of request.
:param flag nofix: Skip server-side metadata parsing. Optional.
:return: status (String), track (Mapping)
:rtype: Dictionary
rawscrobble = {
'track_artists':(artist or []) + artists,
# for logging purposes, don't pass values that we didn't actually supply
rawscrobble = {k:rawscrobble[k] for k in rawscrobble if rawscrobble[k]}
result = database.incoming_scrobble(
client='browser' if auth_result.get('doreah_native_auth_check') else auth_result.get('client'),
fix=(nofix is None)
responsedict = {
'status': 'success',
'track': {
'desc':f"Scrobbled {result['track']['title']} by {', '.join(result['track']['artists'])}",
if extra_kwargs:
responsedict['warnings'] += [
'desc':"This key was not recognized by the server and has been discarded."}
for k in extra_kwargs
if artist and artists:
responsedict['warnings'] += [
'desc':"These two fields are meant as alternative methods to submit information. Use of both is discouraged, but works at the moment."}
if len(responsedict['warnings']) == 0: del responsedict['warnings']
return responsedict"addpicture")
def add_picture(b64,artist:Multi=[],title=None,albumtitle=None):
"""Uploads a new image for an artist or track.
param string b64: Base 64 representation of the image
param string artist: Artist name. Can be supplied multiple times for tracks with multiple artists.
param string title: Title of the track. Optional.
keys = FormsDict()
for a in artist:
if title is not None: keys.append("title",title)
elif albumtitle is not None: keys.append("albumtitle",albumtitle)
k_filter, _, _, _, _ = uri_to_internal(keys)
if "track" in k_filter: k_filter = k_filter["track"]
elif "album" in k_filter: k_filter = k_filter["album"]
url = images.set_image(b64,**k_filter)
return {
'status': 'success',
'url': url
def import_rulemodule(**keys):
"""Internal Use Only"""
filename = keys.get("filename")
remove = keys.get("remove") is not None
validchars = "-_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
filename = "".join(c for c in filename if c in validchars)
if remove:
log("Deactivating predefined rulefile " + filename)
os.remove(data_dir['rules'](filename + ".tsv"))
log("Importing predefined rulefile " + filename)
os.symlink(data_dir['rules']("predefined/" + filename + ".tsv"),data_dir['rules'](filename + ".tsv"))"rebuild")
def rebuild(**keys):
"""Internal Use Only"""
log("Database rebuild initiated!")
dbstatus['rebuildinprogress'] = True
from ..proccontrol.tasks.fixexisting import fix
global cla
cla = CleanerAgent()
def search(**keys):
"""Internal Use Only"""
query = keys.get("query")
max_ = keys.get("max")
if max_ is not None: max_ = int(max_)
query = query.lower()
artists = database.db_search(query,type="ARTIST")
tracks = database.db_search(query,type="TRACK")
albums = database.db_search(query,type="ALBUM")
# if the string begins with the query it's a better match, if a word in it begins with it, still good
# also, shorter is better (because longer titles would be easier to further specify)
artists.sort(key=lambda x: ((0 if x.lower().startswith(query) else 1 if " " + query in x.lower() else 2),len(x)))
tracks.sort(key=lambda x: ((0 if x["title"].lower().startswith(query) else 1 if " " + query in x["title"].lower() else 2),len(x["title"])))
albums.sort(key=lambda x: ((0 if x["albumtitle"].lower().startswith(query) else 1 if " " + query in x["albumtitle"].lower() else 2),len(x["albumtitle"])))
# add links
artists_result = []
for a in artists:
result = {
'artist': a,
'link': "/artist?" + compose_querystring(internal_to_uri({"artist": a})),
'image': images.get_artist_image(a)
tracks_result = []
for t in tracks:
result = {
'track': t,
'link': "/track?" + compose_querystring(internal_to_uri({"track":t})),
'image': images.get_track_image(t)
albums_result = []
for al in albums:
result = {
'album': al,
'link': "/album?" + compose_querystring(internal_to_uri({"album":al})),
'image': images.get_album_image(al)
if not result['album']['artists']: result['album']['displayArtist'] = malojaconfig["DEFAULT_ALBUM_ARTIST"]
return {"artists":artists_result[:max_],"tracks":tracks_result[:max_],"albums":albums_result[:max_]}"newrule")
def newrule(**keys):
"""Internal Use Only"""
# TODO after implementing new rule system
#tsv.add_entry(data_dir['rules']("webmade.tsv"),[k for k in keys])
#addEntry("rules/webmade.tsv",[k for k in keys])"settings")
def set_settings(**keys):
"""Internal Use Only"""
def set_apikeys(**keys):
"""Internal Use Only"""
def import_scrobbles(identifier):
"""Internal Use Only"""
from ..thirdparty import import_scrobbles
def get_backup(**keys):
"""Internal Use Only"""
from ..proccontrol.tasks.backup import backup
import tempfile
tmpfolder = tempfile.gettempdir()
archivefile = backup(tmpfolder)
return static_file(os.path.basename(archivefile),root=tmpfolder)
def get_export(**keys):
"""Internal Use Only"""
from ..proccontrol.tasks.export import export
import tempfile
tmpfolder = tempfile.gettempdir()
resultfile = export(tmpfolder)
return static_file(os.path.basename(resultfile),root=tmpfolder)"delete_scrobble")
def delete_scrobble(timestamp):
"""Internal Use Only"""
result = database.remove_scrobble(timestamp)
return {
"desc":f"Scrobble was deleted!"
def edit_artist(id,name):
"""Internal Use Only"""
result = database.edit_artist(id,name)
return {
def edit_track(id,title):
"""Internal Use Only"""
result = database.edit_track(id,{'title':title})
return {
def edit_album(id,albumtitle):
"""Internal Use Only"""
result = database.edit_album(id,{'albumtitle':albumtitle})
return {
def merge_tracks(target_id,source_ids):
"""Internal Use Only"""
result = database.merge_tracks(target_id,source_ids)
return {
def merge_artists(target_id,source_ids):
"""Internal Use Only"""
result = database.merge_artists(target_id,source_ids)
return {
def merge_artists(target_id,source_ids):
"""Internal Use Only"""
result = database.merge_albums(target_id,source_ids)
return {
def reparse_scrobble(timestamp):
"""Internal Use Only"""
result = database.reparse_scrobble(timestamp)
if result:
return {
"desc":f"Scrobble was reparsed!",
return {
"desc":"The scrobble was not changed."