mirror of
				https://github.com/krateng/maloja.git
				synced 2023-08-10 21:12:55 +03:00 
			
		
		
		
	Reworked import
This commit is contained in:
		| @@ -12,10 +12,12 @@ def loadexternal(filename): | ||||
| 		return | ||||
|  | ||||
| 	from .importer import import_scrobbles | ||||
| 	imported,failed,warning = import_scrobbles(filename) | ||||
| 	imported,warning,skipped,failed = import_scrobbles(filename) | ||||
| 	print("Successfully imported",imported,"scrobbles!") | ||||
| 	if warning > 0: | ||||
| 		print(col['orange'](f"{warning} Warning{'s' if warning != 1 else ''}!")) | ||||
| 	if skipped > 0: | ||||
| 		print(col['orange'](f"{skipped} Skipped!")) | ||||
| 	if failed > 0: | ||||
| 		print(col['red'](f"{failed} Error{'s' if failed != 1 else ''}!")) | ||||
|  | ||||
|   | ||||
| @@ -18,19 +18,24 @@ def err(msg): | ||||
|  | ||||
| def import_scrobbles(inputf): | ||||
|  | ||||
| 	ext = inputf.split('.')[-1].lower() | ||||
|  | ||||
| 	if ext == 'csv': | ||||
| 	if re.match(".*\.csv",inputf): | ||||
| 		type = "Last.fm" | ||||
| 		outputf = data_dir['scrobbles']("lastfmimport.tsv") | ||||
| 		importfunc = parse_lastfm | ||||
|  | ||||
|  | ||||
| 	elif ext == 'json' or os.path.isdir(inputf): | ||||
| 	elif re.match("endsong_[0-9]+\.json",inputf): | ||||
| 		type = "Spotify" | ||||
| 		outputf = data_dir['scrobbles']("spotifyimport.tsv") | ||||
| 		importfunc = parse_spotify | ||||
| 		if os.path.isfile(inputf): inputf = os.path.dirname(inputf) | ||||
| 		importfunc = parse_spotify_full | ||||
|  | ||||
| 	elif re.match("StreamingHistory[0-9]+\.json",inputf): | ||||
| 		type = "Spotify" | ||||
| 		outputf = data_dir['scrobbles']("spotifyimport.tsv") | ||||
| 		importfunc = parse_spotify_lite | ||||
|  | ||||
| 	else: | ||||
| 		print("File",inputf,"could not be identified as a valid import source.") | ||||
| 		return 0,0,0,0 | ||||
|  | ||||
|  | ||||
| 	print(f"Parsing {col['yellow'](inputf)} as {col['cyan'](type)} export") | ||||
| @@ -40,7 +45,7 @@ def import_scrobbles(inputf): | ||||
| 		while True: | ||||
| 			action = prompt(f"Already imported {type} data. [O]verwrite, [A]ppend or [C]ancel?",default='c').lower()[0] | ||||
| 			if action == 'c': | ||||
| 				return 0,0,0 | ||||
| 				return 0,0,0,0 | ||||
| 			elif action == 'a': | ||||
| 				mode = 'a' | ||||
| 				break | ||||
| @@ -54,18 +59,18 @@ def import_scrobbles(inputf): | ||||
|  | ||||
|  | ||||
| 	with open(outputf,mode) as outputfd: | ||||
| 		success = 0 | ||||
| 		failed = 0 | ||||
| 		warning = 0 | ||||
| 		success, warning, skipped, failed = 0, 0, 0, 0 | ||||
| 		timestamps = set() | ||||
|  | ||||
| 		for scrobble in importfunc(inputf): | ||||
| 			if scrobble is None: | ||||
| 		for status,scrobble in importfunc(inputf): | ||||
| 			if status == 'FAIL': | ||||
| 				failed += 1 | ||||
| 			elif scrobble is False: | ||||
| 				warning += 1 | ||||
| 			elif status == 'SKIP': | ||||
| 				skipped += 1 | ||||
| 			else: | ||||
| 				success += 1 | ||||
| 				if status == 'WARN': | ||||
| 					warning += 1 | ||||
|  | ||||
| 				while scrobble['timestamp'] in timestamps: | ||||
| 					scrobble['timestamp'] += 1 | ||||
| @@ -89,24 +94,37 @@ def import_scrobbles(inputf): | ||||
| 				if success % 100 == 0: | ||||
| 					print(f"Imported {success} scrobbles...") | ||||
|  | ||||
| 	return success,failed,warning | ||||
| 	return success, warning, skipped, failed | ||||
|  | ||||
| def parse_spotify_lite(inputf): | ||||
| 	inputfolder = os.path.dirname(inputf) | ||||
| 	filenames = re.compile(r'StreamingHistory[0-9]+\.json') | ||||
| 	inputfiles = [os.path.join(inputfolder,f) for f in os.listdir(inputfolder) if filenames.match(f)] | ||||
|  | ||||
| def parse_spotify(inputf): | ||||
| 	if inputfiles != [inputf]: | ||||
| 		print("Spotify files should all be imported together to identify duplicates across the whole dataset.") | ||||
| 		if not ask("Import " + ", ".join(col['yellow'](i) for i in inputfiles) + "?",default=True): | ||||
| 			inputfiles = [inputf] | ||||
|  | ||||
| 	# TODO | ||||
|  | ||||
| def parse_spotify_full(inputf): | ||||
|  | ||||
| 	inputfolder = os.path.dirname(inputf) | ||||
| 	filenames = re.compile(r'endsong_[0-9]+\.json') | ||||
| 	inputfiles = [os.path.join(inputfolder,f) for f in os.listdir(inputfolder) if filenames.match(f)] | ||||
|  | ||||
| 	inputfiles = [os.path.join(inputf,f) for f in os.listdir(inputf) if filenames.match(f)] | ||||
|  | ||||
| 	if len(inputfiles) == 0: | ||||
| 		print("No files found!") | ||||
| 	elif ask("Importing the following files: " + ", ".join(col['yellow'](i) for i in inputfiles) + ". Confirm?", default=False): | ||||
| 	if inputfiles != [inputf]: | ||||
| 		print("Spotify files should all be imported together to identify duplicates across the whole dataset.") | ||||
| 		if not ask("Import " + ", ".join(col['yellow'](i) for i in inputfiles) + "?",default=True): | ||||
| 			inputfiles = [inputf] | ||||
|  | ||||
| 	# we keep timestamps here as well to remove duplicates because spotify's export | ||||
| 	# is messy - this is specific to this import type and should not be mixed with | ||||
| 	# the outer function timestamp check (which is there to fix duplicate timestamps | ||||
| 	# that are assumed to correspond to actually distinct plays) | ||||
| 	timestamps = {} | ||||
| 	inaccurate_timestamps = {} | ||||
|  | ||||
| 	for inputf in inputfiles: | ||||
|  | ||||
| @@ -117,7 +135,7 @@ def parse_spotify(inputf): | ||||
| 		for entry in data: | ||||
|  | ||||
| 			try: | ||||
| 					sec = int(entry['ms_played'] / 1000) | ||||
| 				played = int(entry['ms_played'] / 1000) | ||||
| 				timestamp = int(entry['offline_timestamp'] / 1000) | ||||
| 				artist = entry['master_metadata_album_artist_name'] | ||||
| 				title = entry['master_metadata_track_name'] | ||||
| @@ -126,24 +144,42 @@ def parse_spotify(inputf): | ||||
|  | ||||
| 				if title is None: | ||||
| 					warn(f"{entry} has no title, skipping...") | ||||
| 						yield False | ||||
| 					yield ('SKIP',None) | ||||
| 					continue | ||||
| 				if artist is None: | ||||
| 					warn(f"{entry} has no artist, skipping...") | ||||
| 						yield False | ||||
| 					yield ('SKIP',None) | ||||
| 					continue | ||||
| 					if sec < 30: | ||||
| 				if played < 30: | ||||
| 					warn(f"{entry} is shorter than 30 seconds, skipping...") | ||||
| 						yield False | ||||
| 					yield ('SKIP',None) | ||||
| 					continue | ||||
|  | ||||
| 				# if offline_timestamp is a proper number, we treat it as | ||||
| 				# accurate and check duplicates by that exact timestamp | ||||
| 				if timestamp != 0: | ||||
| 					status = 'SUCCESS' | ||||
| 					if timestamp in timestamps and (artist,title) in timestamps[timestamp]: | ||||
| 						warn(f"{entry} seems to be a duplicate, skipping...") | ||||
| 						yield False | ||||
| 						yield ('SKIP',None) | ||||
| 						continue | ||||
|  | ||||
| 					timestamps.setdefault(timestamp,[]).append((artist,title)) | ||||
|  | ||||
| 					yield { | ||||
| 				# if it's 0, we use ts instead, but identify duplicates much more | ||||
| 				# liberally (cause the ts is not accurate) | ||||
| 				else: | ||||
| 					status = 'WARN' | ||||
| 					warn(f"{entry} might have an inaccurate timestamp.") | ||||
| 					timestamp = int( | ||||
| 						datetime.datetime.strptime(entry['ts'].replace('Z','+0000',),"%Y-%m-%dT%H:%M:%S%z").timestamp() | ||||
| 					) | ||||
| 					# TODO HEURISTICS | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| 				yield (status,{ | ||||
| 					'title':title, | ||||
| 					'artiststr': artist, | ||||
| 					'album': album, | ||||
| @@ -152,11 +188,11 @@ def parse_spotify(inputf): | ||||
| 				#		"%Y-%m-%dT%H:%M:%S%z" | ||||
| 				#	).timestamp()), | ||||
| 					'timestamp': timestamp, | ||||
| 						'duration':sec | ||||
| 					} | ||||
| 					'duration':played | ||||
| 				}) | ||||
| 			except Exception as e: | ||||
| 				err(f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})") | ||||
| 					yield None | ||||
| 				yield ('FAIL',None) | ||||
| 				continue | ||||
|  | ||||
| 		print() | ||||
| @@ -170,12 +206,12 @@ def parse_lastfm(inputf): | ||||
| 			try: | ||||
| 				artist,album,title,time = row | ||||
| 			except ValueError: | ||||
| 				warn(f"{row} does not look like a valid entry. Scrobble not imported.") | ||||
| 				yield None | ||||
| 				err(f"{row} does not look like a valid entry. Scrobble not imported.") | ||||
| 				yield ('FAIL',None) | ||||
| 				continue | ||||
|  | ||||
| 			try: | ||||
| 				yield { | ||||
| 				yield ('SUCCESS',{ | ||||
| 					'title': title, | ||||
| 					'artiststr': artist, | ||||
| 					'album': album, | ||||
| @@ -184,8 +220,8 @@ def parse_lastfm(inputf): | ||||
| 						"%d %b %Y %H:%M%z" | ||||
| 					).timestamp()), | ||||
| 					'duration':None | ||||
| 				} | ||||
| 				}) | ||||
| 			except Exception as e: | ||||
| 				err(f"{entry} could not be parsed. Scrobble not imported. ({repr(e)})") | ||||
| 				yield None | ||||
| 				yield ('FAIL',None) | ||||
| 				continue | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 krateng
					krateng