mirror of
				https://github.com/krateng/maloja.git
				synced 2023-08-10 21:12:55 +03:00 
			
		
		
		
	Reworked import and added support for Spotify, GH-104
This commit is contained in:
		| @@ -111,7 +111,7 @@ def main(*args,**kwargs): | ||||
| 		"stop":stop, | ||||
| 		"run":direct, | ||||
| 		"debug":debug, | ||||
| 		"import":tasks.loadlastfm, | ||||
| 		"import":tasks.loadexternal, | ||||
| 		"backup":tasks.backuphere, | ||||
| 	#	"update":update, | ||||
| 		"fix":tasks.fixdb, | ||||
|   | ||||
| @@ -5,20 +5,17 @@ from ...globalconf import data_dir | ||||
|  | ||||
|  | ||||
|  | ||||
| def loadlastfm(filename): | ||||
| def loadexternal(filename): | ||||
|  | ||||
| 	if not os.path.exists(filename): | ||||
| 		print("File could not be found.") | ||||
| 		return | ||||
|  | ||||
| 	if os.path.exists(data_dir['scrobbles']("lastfmimport.tsv")): | ||||
| 		overwrite = ask("Already imported Last.FM data. Overwrite?",default=False) | ||||
| 		if not overwrite: return | ||||
| 	print("Please wait...") | ||||
|  | ||||
| 	from .lastfmconverter import convert | ||||
| 	imported,failed = convert(filename,data_dir['scrobbles']("lastfmimport.tsv")) | ||||
| 	print("Successfully imported",imported,"Last.FM scrobbles!") | ||||
| 	from .importer import import_scrobbles | ||||
| 	imported,failed = import_scrobbles(filename) | ||||
| 	print("Successfully imported",imported,"scrobbles!") | ||||
| 	if failed > 0: | ||||
| 		print(col['red'](str(failed) + " Errors!")) | ||||
|  | ||||
|   | ||||
							
								
								
									
										131
									
								
								maloja/proccontrol/tasks/importer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										131
									
								
								maloja/proccontrol/tasks/importer.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,131 @@ | ||||
| import os, datetime, re | ||||
|  | ||||
| import json, csv | ||||
|  | ||||
| from ...cleanup import * | ||||
| from doreah.io import col, ask | ||||
| from ...globalconf import data_dir | ||||
| #from ...utilities import * | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| c = CleanerAgent() | ||||
|  | ||||
|  | ||||
|  | ||||
| def import_scrobbles(inputf): | ||||
|  | ||||
| 	ext = inputf.split('.')[-1].lower() | ||||
|  | ||||
| 	if ext == 'csv': | ||||
| 		type = "Last.fm" | ||||
| 		outputf = data_dir['scrobbles']("lastfmimport.tsv") | ||||
| 		importfunc = parse_lastfm | ||||
|  | ||||
|  | ||||
| 	elif ext == 'json': | ||||
| 		type = "Spotify" | ||||
| 		outputf = data_dir['scrobbles']("spotifyimport.tsv") | ||||
| 		importfunc = parse_spotify | ||||
|  | ||||
|  | ||||
| 	print(f"Parsing {col['yellow'](inputf)} as {col['cyan'](type)} export") | ||||
|  | ||||
| 	if os.path.exists(outputf): | ||||
| 		overwrite = ask("Already imported data. Overwrite?",default=False) | ||||
| 		if not overwrite: return | ||||
|  | ||||
| 	with open(outputf,"w") as outputfd: | ||||
| 		success = 0 | ||||
| 		failed = 0 | ||||
| 		timestamps = set() | ||||
|  | ||||
| 		for scrobble in importfunc(inputf): | ||||
| 			if scrobble is None: | ||||
| 				failed += 1 | ||||
| 			else: | ||||
| 				success += 1 | ||||
|  | ||||
| 				## We prevent double timestamps in the database creation, so we | ||||
| 				## technically don't need them in the files | ||||
| 				## however since the conversion to maloja is a one-time thing, | ||||
| 				## we should take any effort to make the file as good as possible | ||||
| 				while scrobble['timestamp'] in timestamps: | ||||
| 					scrobble['timestamp'] += 1 | ||||
| 				timestamps.add(scrobble['timestamp']) | ||||
|  | ||||
| 				# Format fields for tsv | ||||
| 				scrobble['timestamp'] = str(scrobble['timestamp']) | ||||
| 				scrobble['duration'] = str(scrobble['duration']) if scrobble['duration'] is not None else '-' | ||||
| 				(artists,scrobble['title']) = c.fullclean(scrobble['artiststr'],scrobble['title']) | ||||
| 				scrobble['artiststr'] = "␟".join(artists) | ||||
|  | ||||
| 				outputline = "\t".join([ | ||||
| 					scrobble['timestamp'], | ||||
| 					scrobble['artiststr'], | ||||
| 					scrobble['title'], | ||||
| 					scrobble['album'], | ||||
| 					scrobble['duration'] | ||||
| 				]) | ||||
| 				outputfd.write(outputline + '\n') | ||||
|  | ||||
| 				if success % 100 == 0: | ||||
| 					print(f"Imported {success} scrobbles...") | ||||
|  | ||||
| 	return success,failed | ||||
|  | ||||
|  | ||||
| def parse_spotify(inputf): | ||||
| 	with open(inputf,'r') as inputfd: | ||||
| 		data = json.load(inputfd) | ||||
|  | ||||
| 	for entry in data: | ||||
|  | ||||
| 		sec = int(entry['ms_played'] / 1000) | ||||
|  | ||||
| 		if sec > 30: | ||||
| 			try: | ||||
| 				yield { | ||||
| 					'title':entry['master_metadata_track_name'], | ||||
| 					'artiststr': entry['master_metadata_album_artist_name'], | ||||
| 					'album': entry['master_metadata_album_album_name'], | ||||
| 					'timestamp': int(datetime.datetime.strptime( | ||||
| 						entry['ts'].replace('Z','+0000',), | ||||
| 						"%Y-%m-%dT%H:%M:%S%z" | ||||
| 					).timestamp()), | ||||
| 					'duration':sec | ||||
| 				} | ||||
| 			except: | ||||
| 				print(col['red'](str(entry) + " could not be parsed. Scrobble not imported.")) | ||||
| 				yield None | ||||
| 				continue | ||||
|  | ||||
| def parse_lastfm(inputf): | ||||
|  | ||||
| 	with open(inputf,'r',newline='') as inputfd: | ||||
| 		reader = csv.reader(inputfd) | ||||
|  | ||||
| 		for row in reader: | ||||
| 			try: | ||||
| 				artist,album,title,time = row | ||||
| 			except ValueError: | ||||
| 				print(col['red'](str(row) + " does not look like a valid entry. Scrobble not imported.")) | ||||
| 				yield None | ||||
| 				continue | ||||
|  | ||||
| 			try: | ||||
| 				yield { | ||||
| 					'title': row[2], | ||||
| 					'artiststr': row[0], | ||||
| 					'album': row[1], | ||||
| 					'timestamp': int(datetime.datetime.strptime( | ||||
| 						row[3] + '+0000', | ||||
| 						"%d %b %Y %H:%M%z" | ||||
| 					).timestamp()), | ||||
| 					'duration':None | ||||
| 				} | ||||
| 			except: | ||||
| 				print(col['red'](str(row) + " could not be parsed. Scrobble not imported.")) | ||||
| 				yield None | ||||
| 				continue | ||||
| @@ -1,78 +0,0 @@ | ||||
| import os, datetime, re | ||||
| from ...cleanup import * | ||||
| from doreah.io import col | ||||
| #from ...utilities import * | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| c = CleanerAgent() | ||||
|  | ||||
|  | ||||
|  | ||||
| def convert(input,output): | ||||
|  | ||||
| 	with open(input,"r",encoding="utf-8") as log: | ||||
| 		with open(output,"w") as outputlog: | ||||
|  | ||||
|  | ||||
| 			stamps = [99999999999999] | ||||
|  | ||||
| 			success = 0 | ||||
| 			failed = 0 | ||||
| 			for l in log: | ||||
| 				l = l.replace("\n","") | ||||
| 				try: | ||||
| 					artist,album,title,time = l.split(",") | ||||
| 				except KeyboardInterrupt: | ||||
| 					raise | ||||
| 				except: | ||||
| 					print(col['red']("Line '" + l + "' does not look like a valid entry. Scrobble not imported.")) | ||||
| 					failed += 1 | ||||
| 					continue | ||||
|  | ||||
| 				try: | ||||
| 					(artists,title) = c.fullclean(artist,title) | ||||
| 					artistsstr = "␟".join(artists) | ||||
|  | ||||
| 					timeparts = time.split(" ") | ||||
| 					(h,m) = timeparts[3].split(":") | ||||
|  | ||||
| 					months = {"Jan":1,"Feb":2,"Mar":3,"Apr":4,"May":5,"Jun":6,"Jul":7,"Aug":8,"Sep":9,"Oct":10,"Nov":11,"Dec":12} | ||||
| 					timestamp = int(datetime.datetime(int(timeparts[2]),months[timeparts[1]],int(timeparts[0]),int(h),int(m)).timestamp()) | ||||
|  | ||||
|  | ||||
| 					## We prevent double timestamps in the database creation, so we technically don't need them in the files | ||||
| 					## however since the conversion from lastfm to maloja is a one-time thing, we should take any effort to make | ||||
| 					## the file as good as possible | ||||
| 					if (timestamp < stamps[-1]): | ||||
| 						pass | ||||
| 					elif (timestamp == stamps[-1]): | ||||
| 						timestamp -= 1 | ||||
| 					else: | ||||
| 						while(timestamp in stamps): | ||||
| 							timestamp -= 1 | ||||
|  | ||||
| 					if (timestamp < stamps[-1]): | ||||
| 						stamps.append(timestamp) | ||||
| 					else: | ||||
| 						stamps.insert(0,timestamp) | ||||
| 				except KeyboardInterrupt: | ||||
| 					raise | ||||
| 				except: | ||||
| 					print(col['red']("Line '" + l + "' could not be parsed. Scrobble not imported.")) | ||||
| 					failed += 1 | ||||
| 					continue | ||||
|  | ||||
|  | ||||
| 				entry = "\t".join([str(timestamp),artistsstr,title,album]) | ||||
|  | ||||
| 				outputlog.write(entry) | ||||
| 				outputlog.write("\n") | ||||
|  | ||||
| 				success += 1 | ||||
|  | ||||
| 				if success % 100 == 0: | ||||
| 					print("Imported " + str(success) + " scrobbles...") | ||||
|  | ||||
| 			return (success,failed) | ||||
		Reference in New Issue
	
	Block a user
	 krateng
					krateng