mirror of
https://github.com/krateng/maloja.git
synced 2023-08-10 21:12:55 +03:00
Changed Spotify import to use all files and discard duplicates, GH-104
This commit is contained in:
parent
27cacbf658
commit
2a1f188e37
@ -15,9 +15,9 @@ def loadexternal(filename):
|
||||
imported,failed,warning = import_scrobbles(filename)
|
||||
print("Successfully imported",imported,"scrobbles!")
|
||||
if warning > 0:
|
||||
print(col['orange'](str(warning) + " Warnings!"))
|
||||
print(col['orange'](f"{warning} Warning{'s' if warning != 1 else ''}!"))
|
||||
if failed > 0:
|
||||
print(col['red'](str(failed) + " Errors!"))
|
||||
print(col['red'](f"{failed} Error{'s' if failed != 1 else ''}!"))
|
||||
|
||||
def backuphere():
|
||||
from .backup import backup
|
||||
|
@ -26,17 +26,18 @@ def import_scrobbles(inputf):
|
||||
importfunc = parse_lastfm
|
||||
|
||||
|
||||
elif ext == 'json':
|
||||
elif ext == 'json' or os.path.isdir(inputf):
|
||||
type = "Spotify"
|
||||
outputf = data_dir['scrobbles']("spotifyimport.tsv")
|
||||
importfunc = parse_spotify
|
||||
if os.path.isfile(inputf): inputf = os.path.dirname(inputf)
|
||||
|
||||
|
||||
print(f"Parsing {col['yellow'](inputf)} as {col['cyan'](type)} export")
|
||||
|
||||
if os.path.exists(outputf):
|
||||
while True:
|
||||
action = prompt("Already imported data. Overwrite (o), append (a) or cancel (c)?",default='c')
|
||||
action = prompt(f"Already imported {type} data. [O]verwrite, [A]ppend or [C]ancel?",default='c').lower()[0]
|
||||
if action == 'c':
|
||||
return 0,0,0
|
||||
elif action == 'a':
|
||||
@ -62,10 +63,6 @@ def import_scrobbles(inputf):
|
||||
else:
|
||||
success += 1
|
||||
|
||||
## We prevent double timestamps in the database creation, so we
|
||||
## technically don't need them in the files
|
||||
## however since the conversion to maloja is a one-time thing,
|
||||
## we should take any effort to make the file as good as possible
|
||||
while scrobble['timestamp'] in timestamps:
|
||||
scrobble['timestamp'] += 1
|
||||
timestamps.add(scrobble['timestamp'])
|
||||
@ -92,41 +89,73 @@ def import_scrobbles(inputf):
|
||||
|
||||
|
||||
def parse_spotify(inputf):
|
||||
with open(inputf,'r') as inputfd:
|
||||
data = json.load(inputfd)
|
||||
|
||||
for entry in data:
|
||||
filenames = re.compile(r'endsong_[0-9]+\.json')
|
||||
|
||||
sec = int(entry['ms_played'] / 1000)
|
||||
inputfiles = [os.path.join(inputf,f) for f in os.listdir(inputf) if filenames.match(f)]
|
||||
|
||||
if entry['master_metadata_track_name'] is None:
|
||||
warn(f"{entry} has no title, skipping...")
|
||||
yield False
|
||||
continue
|
||||
if entry['master_metadata_album_artist_name'] is None:
|
||||
warn(f"{entry} has no artist, skipping...")
|
||||
yield False
|
||||
continue
|
||||
if sec < 30:
|
||||
warn(f"{entry} is shorter than 30 seconds, skipping...")
|
||||
yield False
|
||||
continue
|
||||
if len(inputfiles) == 0:
|
||||
print("No files found!")
|
||||
elif ask("Importing the following files: " + ", ".join(col['yellow'](i) for i in inputfiles) + ". Confirm?", default=False):
|
||||
|
||||
try:
|
||||
yield {
|
||||
'title':entry['master_metadata_track_name'],
|
||||
'artiststr': entry['master_metadata_album_artist_name'],
|
||||
'album': entry['master_metadata_album_album_name'],
|
||||
'timestamp': int(datetime.datetime.strptime(
|
||||
entry['ts'].replace('Z','+0000',),
|
||||
"%Y-%m-%dT%H:%M:%S%z"
|
||||
).timestamp()),
|
||||
'duration':sec
|
||||
}
|
||||
except Exception as e:
|
||||
err(f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
||||
yield None
|
||||
continue
|
||||
# we keep timestamps here as well to remove duplicates because spotify's export
|
||||
# is messy - this is specific to this import type and should not be mixed with
|
||||
# the outer function timestamp check (which is there to fix duplicate timestamps
|
||||
# that are assumed to correspond to actually distinct plays)
|
||||
timestamps = {}
|
||||
|
||||
for inputf in inputfiles:
|
||||
|
||||
print("Importing",col['yellow'](inputf),"...")
|
||||
with open(inputf,'r') as inputfd:
|
||||
data = json.load(inputfd)
|
||||
|
||||
for entry in data:
|
||||
|
||||
try:
|
||||
sec = int(entry['ms_played'] / 1000)
|
||||
timestamp = entry['offline_timestamp']
|
||||
artist = entry['master_metadata_album_artist_name']
|
||||
title = entry['master_metadata_track_name']
|
||||
album = entry['master_metadata_album_album_name']
|
||||
|
||||
|
||||
if title is None:
|
||||
warn(f"{entry} has no title, skipping...")
|
||||
yield False
|
||||
continue
|
||||
if artist is None:
|
||||
warn(f"{entry} has no artist, skipping...")
|
||||
yield False
|
||||
continue
|
||||
if sec < 30:
|
||||
warn(f"{entry} is shorter than 30 seconds, skipping...")
|
||||
yield False
|
||||
continue
|
||||
if timestamp in timestamps and (artist,title) in timestamps[timestamp]:
|
||||
warn(f"{entry} seems to be a duplicate, skipping...")
|
||||
yield False
|
||||
continue
|
||||
|
||||
timestamps.setdefault(timestamp,[]).append((artist,title))
|
||||
|
||||
yield {
|
||||
'title':title,
|
||||
'artiststr': artist,
|
||||
'album': album,
|
||||
# 'timestamp': int(datetime.datetime.strptime(
|
||||
# entry['ts'].replace('Z','+0000',),
|
||||
# "%Y-%m-%dT%H:%M:%S%z"
|
||||
# ).timestamp()),
|
||||
'timestamp': timestamp,
|
||||
'duration':sec
|
||||
}
|
||||
except Exception as e:
|
||||
err(f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})")
|
||||
yield None
|
||||
continue
|
||||
|
||||
print()
|
||||
|
||||
def parse_lastfm(inputf):
|
||||
|
||||
@ -143,11 +172,11 @@ def parse_lastfm(inputf):
|
||||
|
||||
try:
|
||||
yield {
|
||||
'title': row[2],
|
||||
'artiststr': row[0],
|
||||
'album': row[1],
|
||||
'title': title,
|
||||
'artiststr': artist,
|
||||
'album': album,
|
||||
'timestamp': int(datetime.datetime.strptime(
|
||||
row[3] + '+0000',
|
||||
time + '+0000',
|
||||
"%d %b %Y %H:%M%z"
|
||||
).timestamp()),
|
||||
'duration':None
|
||||
|
Loading…
Reference in New Issue
Block a user