mirror of https://github.com/krateng/maloja.git
359 lines
11 KiB
Python
359 lines
11 KiB
Python
import os, datetime, re
|
|
import json, csv
|
|
|
|
from doreah.io import col, ask, prompt
|
|
|
|
from ...cleanup import *
|
|
from ...pkg_global.conf import data_dir
|
|
|
|
|
|
c = CleanerAgent()
|
|
|
|
outputs = {
|
|
"CONFIDENT_IMPORT": lambda msg: None,
|
|
"UNCERTAIN_IMPORT": lambda msg: print(col['orange'](msg)),
|
|
#"CONFIDENT_SKIP": lambda msg: print(col['ffcba4'](msg)),
|
|
"CONFIDENT_SKIP": lambda msg: None,
|
|
"UNCERTAIN_SKIP": lambda msg: print(col['indianred'](msg)),
|
|
"FAIL": lambda msg: print(col['red'](msg)),
|
|
}
|
|
|
|
|
|
def import_scrobbles(inputf):
|
|
|
|
from ...database.sqldb import add_scrobbles
|
|
|
|
result = {
|
|
"CONFIDENT_IMPORT": 0,
|
|
"UNCERTAIN_IMPORT": 0,
|
|
"CONFIDENT_SKIP": 0,
|
|
"UNCERTAIN_SKIP": 0,
|
|
"FAIL": 0
|
|
}
|
|
|
|
filename = os.path.basename(inputf)
|
|
|
|
if re.match(".*\.csv",filename):
|
|
typeid,typedesc = "lastfm","Last.fm"
|
|
importfunc = parse_lastfm
|
|
|
|
elif re.match("endsong_[0-9]+\.json",filename):
|
|
typeid,typedesc = "spotify","Spotify"
|
|
importfunc = parse_spotify_full
|
|
|
|
elif re.match("StreamingHistory[0-9]+\.json",filename):
|
|
typeid,typedesc = "spotify","Spotify"
|
|
importfunc = parse_spotify_lite
|
|
|
|
elif re.match("maloja_export_[0-9]+\.json",filename):
|
|
typeid,typedesc = "maloja","Maloja"
|
|
importfunc = parse_maloja
|
|
|
|
# username_lb-YYYY-MM-DD.json
|
|
elif re.match(".*_lb-[0-9-]+\.json",filename):
|
|
typeid,typedesc = "listenbrainz","ListenBrainz"
|
|
importfunc = parse_listenbrainz
|
|
|
|
else:
|
|
print("File",inputf,"could not be identified as a valid import source.")
|
|
return result
|
|
|
|
|
|
print(f"Parsing {col['yellow'](inputf)} as {col['cyan'](typedesc)} export")
|
|
print("This could take a while...")
|
|
|
|
timestamps = set()
|
|
scrobblebuffer = []
|
|
|
|
for status,scrobble,msg in importfunc(inputf):
|
|
result[status] += 1
|
|
outputs[status](msg)
|
|
if status in ['CONFIDENT_IMPORT','UNCERTAIN_IMPORT']:
|
|
|
|
# prevent duplicate timestamps
|
|
while scrobble['scrobble_time'] in timestamps:
|
|
scrobble['scrobble_time'] += 1
|
|
timestamps.add(scrobble['scrobble_time'])
|
|
|
|
# clean up
|
|
(scrobble['track_artists'],scrobble['track_title']) = c.fullclean(scrobble['track_artists'],scrobble['track_title'])
|
|
|
|
# extra info
|
|
extrainfo = {}
|
|
if scrobble.get('album_name'): extrainfo['album_name'] = scrobble['album_name']
|
|
# saving this in the scrobble instead of the track because for now it's not meant
|
|
# to be authorative information, just payload of the scrobble
|
|
|
|
scrobblebuffer.append({
|
|
"time":scrobble['scrobble_time'],
|
|
"track":{
|
|
"artists":scrobble['track_artists'],
|
|
"title":scrobble['track_title'],
|
|
"length":scrobble['track_length'],
|
|
},
|
|
"duration":scrobble['scrobble_duration'],
|
|
"origin":"import:" + typeid,
|
|
"extra":extrainfo
|
|
})
|
|
|
|
if (result['CONFIDENT_IMPORT'] + result['UNCERTAIN_IMPORT']) % 1000 == 0:
|
|
print(f"Imported {result['CONFIDENT_IMPORT'] + result['UNCERTAIN_IMPORT']} scrobbles...")
|
|
add_scrobbles(scrobblebuffer)
|
|
scrobblebuffer = []
|
|
|
|
add_scrobbles(scrobblebuffer)
|
|
|
|
msg = f"Successfully imported {result['CONFIDENT_IMPORT'] + result['UNCERTAIN_IMPORT']} scrobbles"
|
|
if result['UNCERTAIN_IMPORT'] > 0:
|
|
warningmsg = col['orange'](f"{result['UNCERTAIN_IMPORT']} Warning{'s' if result['UNCERTAIN_IMPORT'] != 1 else ''}!")
|
|
msg += f" ({warningmsg})"
|
|
print(msg)
|
|
|
|
msg = f"Skipped {result['CONFIDENT_SKIP'] + result['UNCERTAIN_SKIP']} scrobbles"
|
|
if result['UNCERTAIN_SKIP'] > 0:
|
|
warningmsg = col['indianred'](f"{result['UNCERTAIN_SKIP']} Warning{'s' if result['UNCERTAIN_SKIP'] != 1 else ''}!")
|
|
msg += f" ({warningmsg})"
|
|
print(msg)
|
|
|
|
if result['FAIL'] > 0:
|
|
print(col['red'](f"{result['FAIL']} Error{'s' if result['FAIL'] != 1 else ''}!"))
|
|
|
|
|
|
return result
|
|
|
|
def parse_spotify_lite(inputf):
|
|
pth = os.path
|
|
inputfolder = pth.relpath(pth.dirname(pth.abspath(inputf)))
|
|
filenames = re.compile(r'StreamingHistory[0-9]+\.json')
|
|
inputfiles = [os.path.join(inputfolder,f) for f in os.listdir(inputfolder) if filenames.match(f)]
|
|
|
|
if len(inputfiles) == 0:
|
|
print("No files found!")
|
|
return
|
|
|
|
if inputfiles != [inputf]:
|
|
print("Spotify files should all be imported together to identify duplicates across the whole dataset.")
|
|
if not ask("Import " + ", ".join(col['yellow'](i) for i in inputfiles) + "?",default=True):
|
|
inputfiles = [inputf]
|
|
|
|
for inputf in inputfiles:
|
|
|
|
print("Importing",col['yellow'](inputf),"...")
|
|
with open(inputf,'r') as inputfd:
|
|
data = json.load(inputfd)
|
|
|
|
for entry in data:
|
|
|
|
try:
|
|
played = int(entry['msPlayed'] / 1000)
|
|
timestamp = int(
|
|
datetime.datetime.strptime(entry['endTime'],"%Y-%m-%d %H:%M").timestamp()
|
|
)
|
|
artist = entry['artistName']
|
|
title = entry['trackName']
|
|
|
|
if played < 30:
|
|
yield ('CONFIDENT_SKIP',None,f"{entry} is shorter than 30 seconds, skipping...")
|
|
continue
|
|
|
|
yield ("CONFIDENT_IMPORT",{
|
|
'track_title':title,
|
|
'track_artists': artist,
|
|
'track_length': None,
|
|
'scrobble_time': timestamp,
|
|
'scrobble_duration':played,
|
|
'album_name': None
|
|
},'')
|
|
except Exception as e:
|
|
yield ('FAIL',None,f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
|
continue
|
|
|
|
print()
|
|
|
|
|
|
def parse_spotify_full(inputf):
|
|
pth = os.path
|
|
inputfolder = pth.relpath(pth.dirname(pth.abspath(inputf)))
|
|
filenames = re.compile(r'endsong_[0-9]+\.json')
|
|
inputfiles = [os.path.join(inputfolder,f) for f in os.listdir(inputfolder) if filenames.match(f)]
|
|
|
|
if len(inputfiles) == 0:
|
|
print("No files found!")
|
|
return
|
|
|
|
if inputfiles != [inputf]:
|
|
print("Spotify files should all be imported together to identify duplicates across the whole dataset.")
|
|
if not ask("Import " + ", ".join(col['yellow'](i) for i in inputfiles) + "?",default=True):
|
|
inputfiles = [inputf]
|
|
|
|
# we keep timestamps here as well to remove duplicates because spotify's export
|
|
# is messy - this is specific to this import type and should not be mixed with
|
|
# the outer function timestamp check (which is there to fix duplicate timestamps
|
|
# that are assumed to correspond to actually distinct plays)
|
|
timestamps = {}
|
|
inaccurate_timestamps = {}
|
|
|
|
for inputf in inputfiles:
|
|
|
|
print("Importing",col['yellow'](inputf),"...")
|
|
with open(inputf,'r') as inputfd:
|
|
data = json.load(inputfd)
|
|
|
|
for entry in data:
|
|
|
|
try:
|
|
played = int(entry['ms_played'] / 1000)
|
|
timestamp = int(entry['offline_timestamp'] / 1000)
|
|
artist = entry['master_metadata_album_artist_name']
|
|
title = entry['master_metadata_track_name']
|
|
album = entry['master_metadata_album_album_name']
|
|
|
|
|
|
if title is None:
|
|
yield ('CONFIDENT_SKIP',None,f"{entry} has no title, skipping...")
|
|
continue
|
|
if artist is None:
|
|
yield ('CONFIDENT_SKIP',None,f"{entry} has no artist, skipping...")
|
|
continue
|
|
if played < 30:
|
|
yield ('CONFIDENT_SKIP',None,f"{entry} is shorter than 30 seconds, skipping...")
|
|
continue
|
|
|
|
# if offline_timestamp is a proper number, we treat it as
|
|
# accurate and check duplicates by that exact timestamp
|
|
if timestamp != 0:
|
|
|
|
if timestamp in timestamps and (artist,title) in timestamps[timestamp]:
|
|
yield ('CONFIDENT_SKIP',None,f"{entry} seems to be a duplicate, skipping...")
|
|
continue
|
|
else:
|
|
status = 'CONFIDENT_IMPORT'
|
|
msg = ''
|
|
timestamps.setdefault(timestamp,[]).append((artist,title))
|
|
|
|
# if it's 0, we use ts instead, but identify duplicates differently
|
|
# (cause the ts is not accurate)
|
|
else:
|
|
|
|
timestamp = int(
|
|
datetime.datetime.strptime(entry['ts'].replace('Z','+0000'),"%Y-%m-%dT%H:%M:%S%z").timestamp()
|
|
)
|
|
|
|
|
|
ts_group = int(timestamp/10)
|
|
relevant_ts_groups = [ts_group-3,ts_group-2,ts_group-1,ts_group,ts_group+1,ts_group+2,ts_group+3]
|
|
similar_scrobbles = [scrob for tsg in relevant_ts_groups for scrob in inaccurate_timestamps.get(tsg,[])]
|
|
|
|
scrobble_describe = (timestamp,entry['spotify_track_uri'],entry['ms_played'])
|
|
found_similar = False
|
|
for scr in similar_scrobbles:
|
|
# scrobbles count as duplicate if:
|
|
# - less than 30 seconds apart
|
|
# - exact same track uri
|
|
# - exact same ms_played
|
|
if (abs(scr[0] - timestamp) < 30) and scr[1:] == scrobble_describe[1:]:
|
|
yield ('UNCERTAIN_SKIP',None,f"{entry} might be a duplicate, skipping...")
|
|
found_similar = True
|
|
break
|
|
else:
|
|
# no duplicates, assume proper scrobble but warn
|
|
status = 'UNCERTAIN_IMPORT'
|
|
msg = f"{entry} might have an inaccurate timestamp."
|
|
inaccurate_timestamps.setdefault(ts_group,[]).append(scrobble_describe)
|
|
|
|
if found_similar:
|
|
continue
|
|
|
|
|
|
yield (status,{
|
|
'track_title':title,
|
|
'track_artists': artist,
|
|
'track_length': None,
|
|
'album_name': album,
|
|
'scrobble_time': timestamp,
|
|
'scrobble_duration':played
|
|
},msg)
|
|
except Exception as e:
|
|
yield ('FAIL',None,f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
|
continue
|
|
|
|
print()
|
|
|
|
def parse_lastfm(inputf):
|
|
|
|
with open(inputf,'r',newline='') as inputfd:
|
|
reader = csv.reader(inputfd)
|
|
|
|
line = 0
|
|
for row in reader:
|
|
line += 1
|
|
try:
|
|
artist,album,title,time = row
|
|
except ValueError:
|
|
yield ('FAIL',None,f"{row} (Line {line}) does not look like a valid entry. Scrobble not imported.")
|
|
continue
|
|
|
|
if time == '':
|
|
yield ('FAIL',None,f"{row} (Line {line}) is missing a timestamp.")
|
|
continue
|
|
|
|
try:
|
|
yield ('CONFIDENT_IMPORT',{
|
|
'track_title': title,
|
|
'track_artists': artist,
|
|
'track_length': None,
|
|
'album_name': album,
|
|
'scrobble_time': int(datetime.datetime.strptime(
|
|
time + '+0000',
|
|
"%d %b %Y %H:%M%z"
|
|
).timestamp()),
|
|
'scrobble_duration':None
|
|
},'')
|
|
except Exception as e:
|
|
yield ('FAIL',None,f"{row} (Line {line}) could not be parsed. Scrobble not imported. ({repr(e)})")
|
|
continue
|
|
|
|
def parse_listenbrainz(inputf):
|
|
|
|
with open(inputf,'r') as inputfd:
|
|
data = json.load(inputfd)
|
|
|
|
for entry in data:
|
|
|
|
try:
|
|
track_metadata = entry['track_metadata']
|
|
additional_info = track_metadata.get('additional_info', {})
|
|
|
|
yield ("CONFIDENT_IMPORT",{
|
|
'track_title': track_metadata['track_name'],
|
|
'track_artists': additional_info.get('artist_names') or track_metadata['artist_name'],
|
|
'track_length': int(additional_info.get('duration_ms', 0) / 1000) or additional_info.get('duration'),
|
|
'album_name': track_metadata.get('release_name'),
|
|
'scrobble_time': entry['listened_at'],
|
|
'scrobble_duration': None,
|
|
},'')
|
|
except Exception as e:
|
|
yield ('FAIL',None,f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
|
continue
|
|
|
|
def parse_maloja(inputf):
|
|
|
|
with open(inputf,'r') as inputfd:
|
|
data = json.load(inputfd)
|
|
|
|
scrobbles = data['scrobbles']
|
|
|
|
for s in scrobbles:
|
|
try:
|
|
yield ('CONFIDENT_IMPORT',{
|
|
'track_title': s['track']['title'],
|
|
'track_artists': s['track']['artists'],
|
|
'track_length': s['track']['length'],
|
|
'album_name': s['track'].get('album',{}).get('name',''),
|
|
'scrobble_time': s['time'],
|
|
'scrobble_duration': s['duration']
|
|
},'')
|
|
except Exception as e:
|
|
yield ('FAIL',None,f"{s} could not be parsed. Scrobble not imported. ({repr(e)})")
|
|
continue
|