mirror of
https://github.com/krateng/maloja.git
synced 2023-08-10 21:12:55 +03:00
Merge branch 'master' into v3
This commit is contained in:
commit
1b087e92db
@ -1,5 +1,6 @@
|
||||
import random
|
||||
import datetime
|
||||
from doreah.io import ask
|
||||
|
||||
from ...database.sqldb import add_scrobbles
|
||||
|
||||
@ -70,21 +71,22 @@ def generate_track():
|
||||
def generate(n=200):
|
||||
n = int(n)
|
||||
|
||||
scrobbles = []
|
||||
for _ in range(n):
|
||||
track = generate_track()
|
||||
print("Generated",track)
|
||||
for _ in range(random.randint(1, 50)):
|
||||
timestamp = random.randint(1, int(datetime.datetime.now().timestamp()))
|
||||
if ask("Generate random scrobbles?",default=False):
|
||||
scrobbles = []
|
||||
for _ in range(n):
|
||||
track = generate_track()
|
||||
print("Generated",track)
|
||||
for _ in range(random.randint(1, 50)):
|
||||
timestamp = random.randint(1, int(datetime.datetime.now().timestamp()))
|
||||
|
||||
scrobbles.append({
|
||||
"time":timestamp,
|
||||
"track":{
|
||||
"artists":track['artists'],
|
||||
"title":track['title']
|
||||
},
|
||||
"duration":None,
|
||||
"origin":"generated"
|
||||
})
|
||||
scrobbles.append({
|
||||
"time":timestamp,
|
||||
"track":{
|
||||
"artists":track['artists'],
|
||||
"title":track['title']
|
||||
},
|
||||
"duration":None,
|
||||
"origin":"generated"
|
||||
})
|
||||
|
||||
add_scrobbles(scrobbles)
|
||||
add_scrobbles(scrobbles)
|
||||
|
@ -1,57 +1,69 @@
|
||||
import os, datetime, re
|
||||
|
||||
import json, csv
|
||||
|
||||
from doreah.io import col, ask, prompt
|
||||
|
||||
from ...cleanup import *
|
||||
from doreah.io import col, ask
|
||||
from ...globalconf import data_dir
|
||||
|
||||
from ...database.sqldb import add_scrobbles
|
||||
#from ...images import *
|
||||
|
||||
|
||||
|
||||
|
||||
c = CleanerAgent()
|
||||
|
||||
outputs = {
|
||||
"CONFIDENT_IMPORT": lambda msg: None,
|
||||
"UNCERTAIN_IMPORT": lambda msg: print(col['orange'](msg)),
|
||||
#"CONFIDENT_SKIP": lambda msg: print(col['ffcba4'](msg)),
|
||||
"CONFIDENT_SKIP": lambda msg: None,
|
||||
"UNCERTAIN_SKIP": lambda msg: print(col['orange'](msg)),
|
||||
"FAIL": lambda msg: print(col['red'](msg)),
|
||||
}
|
||||
|
||||
# TODO db import
|
||||
def import_scrobbles(fromfile):
|
||||
|
||||
if not os.path.exists(fromfile):
|
||||
print("File could not be found.")
|
||||
return
|
||||
def import_scrobbles(inputf):
|
||||
|
||||
ext = fromfile.split('.')[-1].lower()
|
||||
result = {
|
||||
"CONFIDENT_IMPORT": 0,
|
||||
"UNCERTAIN_IMPORT": 0,
|
||||
"CONFIDENT_SKIP": 0,
|
||||
"UNCERTAIN_SKIP": 0,
|
||||
"FAIL": 0
|
||||
}
|
||||
|
||||
if ext == 'csv':
|
||||
import_type = "Last.fm"
|
||||
filename = os.path.basename(inputf)
|
||||
|
||||
if re.match(".*\.csv",filename):
|
||||
type = "Last.fm"
|
||||
importfunc = parse_lastfm
|
||||
|
||||
elif re.match("endsong_[0-9]+\.json",filename):
|
||||
type = "Spotify"
|
||||
importfunc = parse_spotify_full
|
||||
|
||||
elif ext == 'json':
|
||||
import_type = "Spotify"
|
||||
importfunc = parse_spotify
|
||||
elif re.match("StreamingHistory[0-9]+\.json",filename):
|
||||
type = "Spotify"
|
||||
importfunc = parse_spotify_lite
|
||||
|
||||
else:
|
||||
print("File",inputf,"could not be identified as a valid import source.")
|
||||
return result
|
||||
|
||||
|
||||
print(f"Parsing {col['yellow'](fromfile)} as {col['cyan'](import_type)} export")
|
||||
print(f"Parsing {col['yellow'](inputf)} as {col['cyan'](type)} export")
|
||||
|
||||
|
||||
success = 0
|
||||
failed = 0
|
||||
timestamps = set()
|
||||
scrobblebuffer = []
|
||||
|
||||
for status,scrobble,msg in importfunc(inputf):
|
||||
result[status] += 1
|
||||
outputs[status](msg)
|
||||
if status in ['CONFIDENT_IMPORT','UNCERTAIN_IMPORT']:
|
||||
|
||||
for scrobble in importfunc(fromfile):
|
||||
if scrobble is None:
|
||||
failed += 1
|
||||
else:
|
||||
success += 1
|
||||
|
||||
# prevent duplicate timestamps within one import file
|
||||
# prevent duplicate timestamps
|
||||
while scrobble['timestamp'] in timestamps:
|
||||
scrobble['timestamp'] += 1
|
||||
timestamps.add(scrobble['timestamp'])
|
||||
|
||||
# clean up
|
||||
(scrobble['artists'],scrobble['title']) = c.fullclean(scrobble['artists'],scrobble['title'])
|
||||
|
||||
@ -71,43 +83,178 @@ def import_scrobbles(fromfile):
|
||||
}
|
||||
})
|
||||
|
||||
if success % 1000 == 0:
|
||||
print(f"Imported {success} scrobbles...")
|
||||
if (result['CONFIDENT_IMPORT'] + result['UNCERTAIN_IMPORT']) % 1000 == 0:
|
||||
print(f"Imported {result['CONFIDENT_IMPORT'] + result['UNCERTAIN_IMPORT']} scrobbles...")
|
||||
add_scrobbles(scrobblebuffer)
|
||||
scrobblebuffer = []
|
||||
|
||||
add_scrobbles(scrobblebuffer)
|
||||
print("Successfully imported",success,"scrobbles!")
|
||||
if failed > 0:
|
||||
print(col['red'](str(failed) + " Errors!"))
|
||||
return success,failed
|
||||
|
||||
msg = f"Successfully imported {result['CONFIDENT_IMPORT'] + result['UNCERTAIN_IMPORT']} scrobbles"
|
||||
if result['UNCERTAIN_IMPORT'] > 0:
|
||||
warningmsg = col['orange'](f"{result['UNCERTAIN_IMPORT']} Warning{'s' if result['UNCERTAIN_IMPORT'] != 1 else ''}!")
|
||||
msg += f" ({warningmsg})"
|
||||
print(msg)
|
||||
|
||||
msg = f"Skipped {result['CONFIDENT_SKIP'] + result['UNCERTAIN_SKIP']} scrobbles"
|
||||
if result['UNCERTAIN_SKIP'] > 0:
|
||||
warningmsg = col['orange'](f"{result['UNCERTAIN_SKIP']} Warning{'s' if result['UNCERTAIN_SKIP'] != 1 else ''}!")
|
||||
msg += f" ({warningmsg})"
|
||||
print(msg)
|
||||
|
||||
if result['FAIL'] > 0:
|
||||
print(col['red'](f"{result['FAIL']} Error{'s' if result['FAIL'] != 1 else ''}!"))
|
||||
|
||||
|
||||
def parse_spotify(inputf):
|
||||
with open(inputf,'r') as inputfd:
|
||||
data = json.load(inputfd)
|
||||
return result
|
||||
|
||||
for entry in data:
|
||||
def parse_spotify_lite(inputf):
|
||||
inputfolder = os.path.dirname(inputf)
|
||||
filenames = re.compile(r'StreamingHistory[0-9]+\.json')
|
||||
inputfiles = [os.path.join(inputfolder,f) for f in os.listdir(inputfolder) if filenames.match(f)]
|
||||
|
||||
sec = int(entry['ms_played'] / 1000)
|
||||
if inputfiles != [inputf]:
|
||||
print("Spotify files should all be imported together to identify duplicates across the whole dataset.")
|
||||
if not ask("Import " + ", ".join(col['yellow'](i) for i in inputfiles) + "?",default=True):
|
||||
inputfiles = [inputf]
|
||||
|
||||
for inputf in inputfiles:
|
||||
|
||||
print("Importing",col['yellow'](inputf),"...")
|
||||
with open(inputf,'r') as inputfd:
|
||||
data = json.load(inputfd)
|
||||
|
||||
for entry in data:
|
||||
|
||||
if sec > 30:
|
||||
try:
|
||||
yield {
|
||||
'title':entry['master_metadata_track_name'],
|
||||
'artists': entry['master_metadata_album_artist_name'],
|
||||
'album': entry['master_metadata_album_album_name'],
|
||||
'timestamp': int(datetime.datetime.strptime(
|
||||
entry['ts'].replace('Z','+0000',),
|
||||
"%Y-%m-%dT%H:%M:%S%z"
|
||||
).timestamp()),
|
||||
'duration':sec
|
||||
}
|
||||
except:
|
||||
print(col['red'](str(entry) + " could not be parsed. Scrobble not imported."))
|
||||
yield None
|
||||
played = int(entry['msPlayed'] / 1000)
|
||||
timestamp = int(
|
||||
datetime.datetime.strptime(entry['endTime'],"%Y-%m-%d %H:%M").timestamp()
|
||||
)
|
||||
artist = entry['artistName']
|
||||
title = entry['trackName']
|
||||
|
||||
if played < 30:
|
||||
yield ('CONFIDENT_SKIP',None,f"{entry} is shorter than 30 seconds, skipping...")
|
||||
continue
|
||||
|
||||
yield ("CONFIDENT_IMPORT",{
|
||||
'title':title,
|
||||
'artists': artist,
|
||||
'timestamp': timestamp,
|
||||
'duration':played,
|
||||
'album': None
|
||||
},'')
|
||||
except Exception as e:
|
||||
yield ('FAIL',None,f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
||||
continue
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def parse_spotify_full(inputf):
|
||||
|
||||
inputfolder = os.path.dirname(inputf)
|
||||
filenames = re.compile(r'endsong_[0-9]+\.json')
|
||||
inputfiles = [os.path.join(inputfolder,f) for f in os.listdir(inputfolder) if filenames.match(f)]
|
||||
|
||||
if inputfiles != [inputf]:
|
||||
print("Spotify files should all be imported together to identify duplicates across the whole dataset.")
|
||||
if not ask("Import " + ", ".join(col['yellow'](i) for i in inputfiles) + "?",default=True):
|
||||
inputfiles = [inputf]
|
||||
|
||||
# we keep timestamps here as well to remove duplicates because spotify's export
|
||||
# is messy - this is specific to this import type and should not be mixed with
|
||||
# the outer function timestamp check (which is there to fix duplicate timestamps
|
||||
# that are assumed to correspond to actually distinct plays)
|
||||
timestamps = {}
|
||||
inaccurate_timestamps = {}
|
||||
|
||||
for inputf in inputfiles:
|
||||
|
||||
print("Importing",col['yellow'](inputf),"...")
|
||||
with open(inputf,'r') as inputfd:
|
||||
data = json.load(inputfd)
|
||||
|
||||
for entry in data:
|
||||
|
||||
try:
|
||||
played = int(entry['ms_played'] / 1000)
|
||||
timestamp = int(entry['offline_timestamp'] / 1000)
|
||||
artist = entry['master_metadata_album_artist_name']
|
||||
title = entry['master_metadata_track_name']
|
||||
album = entry['master_metadata_album_album_name']
|
||||
|
||||
|
||||
if title is None:
|
||||
yield ('CONFIDENT_SKIP',None,f"{entry} has no title, skipping...")
|
||||
continue
|
||||
if artist is None:
|
||||
yield ('CONFIDENT_SKIP',None,f"{entry} has no artist, skipping...")
|
||||
continue
|
||||
if played < 30:
|
||||
yield ('CONFIDENT_SKIP',None,f"{entry} is shorter than 30 seconds, skipping...")
|
||||
continue
|
||||
|
||||
# if offline_timestamp is a proper number, we treat it as
|
||||
# accurate and check duplicates by that exact timestamp
|
||||
if timestamp != 0:
|
||||
|
||||
if timestamp in timestamps and (artist,title) in timestamps[timestamp]:
|
||||
yield ('CONFIDENT_SKIP',None,f"{entry} seems to be a duplicate, skipping...")
|
||||
continue
|
||||
else:
|
||||
status = 'CONFIDENT_IMPORT'
|
||||
msg = ''
|
||||
timestamps.setdefault(timestamp,[]).append((artist,title))
|
||||
|
||||
# if it's 0, we use ts instead, but identify duplicates differently
|
||||
# (cause the ts is not accurate)
|
||||
else:
|
||||
|
||||
timestamp = int(
|
||||
datetime.datetime.strptime(entry['ts'].replace('Z','+0000'),"%Y-%m-%dT%H:%M:%S%z").timestamp()
|
||||
)
|
||||
|
||||
|
||||
ts_group = int(timestamp/10)
|
||||
relevant_ts_groups = [ts_group-3,ts_group-2,ts_group-1,ts_group,ts_group+1,ts_group+2,ts_group+3]
|
||||
similar_scrobbles = [scrob for tsg in relevant_ts_groups for scrob in inaccurate_timestamps.get(tsg,[])]
|
||||
|
||||
scrobble_describe = (timestamp,entry['spotify_track_uri'],entry['ms_played'])
|
||||
found_similar = False
|
||||
for scr in similar_scrobbles:
|
||||
# scrobbles count as duplicate if:
|
||||
# - less than 30 seconds apart
|
||||
# - exact same track uri
|
||||
# - exact same ms_played
|
||||
if (abs(scr[0] - timestamp) < 30) and scr[1:] == scrobble_describe[1:]:
|
||||
yield ('UNCERTAIN_SKIP',None,f"{entry} might be a duplicate, skipping...")
|
||||
found_similar = True
|
||||
break
|
||||
else:
|
||||
# no duplicates, assume proper scrobble but warn
|
||||
status = 'UNCERTAIN_IMPORT'
|
||||
msg = f"{entry} might have an inaccurate timestamp."
|
||||
inaccurate_timestamps.setdefault(ts_group,[]).append(scrobble_describe)
|
||||
|
||||
if found_similar:
|
||||
continue
|
||||
|
||||
|
||||
yield (status,{
|
||||
'title':title,
|
||||
'artists': artist,
|
||||
'album': album,
|
||||
'timestamp': timestamp,
|
||||
'duration':played
|
||||
},msg)
|
||||
except Exception as e:
|
||||
yield ('FAIL',None,f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
||||
continue
|
||||
|
||||
print()
|
||||
|
||||
def parse_lastfm(inputf):
|
||||
|
||||
with open(inputf,'r',newline='') as inputfd:
|
||||
@ -117,22 +264,20 @@ def parse_lastfm(inputf):
|
||||
try:
|
||||
artist,album,title,time = row
|
||||
except ValueError:
|
||||
print(col['red'](str(row) + " does not look like a valid entry. Scrobble not imported."))
|
||||
yield None
|
||||
yield ('FAIL',None,f"{row} does not look like a valid entry. Scrobble not imported.")
|
||||
continue
|
||||
|
||||
try:
|
||||
yield {
|
||||
'title': row[2],
|
||||
'artists': row[0],
|
||||
'album': row[1],
|
||||
yield ('CONFIDENT_IMPORT',{
|
||||
'title': title,
|
||||
'artists': artist,
|
||||
'album': album,
|
||||
'timestamp': int(datetime.datetime.strptime(
|
||||
row[3] + '+0000',
|
||||
time + '+0000',
|
||||
"%d %b %Y %H:%M%z"
|
||||
).timestamp()),
|
||||
'duration':None
|
||||
}
|
||||
except:
|
||||
print(col['red'](str(row) + " could not be parsed. Scrobble not imported."))
|
||||
yield None
|
||||
},'')
|
||||
except Exception as e:
|
||||
yield ('FAIL',None,f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
||||
continue
|
||||
|
Loading…
Reference in New Issue
Block a user