def add(self, trackDir: str, trackFile: str) -> None:
self.tracks.append(LibraryTrack(trackDir, trackFile))
-class Task:
- def __init__(self, taskType: str, converterArgs: list[str],
- inputFilePath: str, outputFilePath: str) -> None:
- self.taskType: str = taskType # Convert or copy.
- self.converterArgs: list[str] = converterArgs
- self.inputFilePath: str = inputFilePath
- self.outputFilePath: str = outputFilePath
-
class ConvertParams:
def __init__(self, playlistDir: str, targetDir: str,
convertProfile: str, converterPath: str,
def isCopyExtension(self, extension: str) -> bool:
return extension in self.copyExtensions
+class Task:
+ def __init__(self, taskId: int, taskType: str, converterArgs: list[str],
+ inputFilePath: str, outputFilePath: str) -> None:
+ self.taskId: int = taskId
+ self.taskType: str = taskType # Convert or copy.
+ self.converterArgs: list[str] = converterArgs
+ self.inputFilePath: str = inputFilePath
+ self.outputFilePath: str = outputFilePath
+
class Stats:
def __init__(self) -> None:
self.errors: int = 0
+ self.skipped: int = 0
+ self.duplicates: int = 0
+ self.tasks: int = 0
self.converted: int = 0
self.copied: int = 0
- self.removed: int = 0
self.updated: int = 0
- self.duplicates: int = 0
- self.skipped: int = 0
+ self.removed: int = 0
def add(self, stats) -> None:
self.errors += stats.errors
+ self.skipped += stats.skipped
+ self.duplicates += stats.duplicates
+ self.tasks += stats.tasks
self.converted += stats.converted
self.copied += stats.copied
- self.removed += stats.removed
self.updated += stats.updated
- self.duplicates += stats.duplicates
- self.skipped += stats.skipped
+ self.removed += stats.removed
# Main converter class. Thread safe.
class Converter:
# if not found in original playlist directory.
unmatchedPlaylistFiles: set[str] = set()
# Total playlist track count.
- count: int = 0
+ trackCount: int = 0
stats: Stats = Stats()
# Task queue.
q: queue.SimpleQueue = queue.SimpleQueue()
stats.errors += 1
elif self.isFileChanged(trackFilePath, targetFilePath):
processedFiles.add(targetFile)
- q.put(Task(taskType, convertProfile.converterArgs,
+ stats.tasks += 1
+ q.put(Task(stats.tasks, taskType, convertProfile.converterArgs,
trackFilePath, targetFilePath))
+
else:
processedFiles.add(targetFile)
stats.skipped += 1
# Target does not exist.
else:
processedFiles.add(targetFile)
- q.put(Task(taskType, convertProfile.converterArgs,
+ stats.tasks += 1
+ q.put(Task(stats.tasks, taskType, convertProfile.converterArgs,
trackFilePath, targetFilePath))
# /for track in playlist.tracks
- count += len(playlist.tracks)
+ trackCount += len(playlist.tracks)
# Replace existing playlist file.
self._updatePlaylistFile(targetFileList,
for i in range(0, len(threads)):
threads[i].join()
- self._log("From " + str(count) + " elements in "
+ self._log("From " + str(trackCount) + " tracks in "
+ str(len(libPlaylists)) + " playlists:"
+ " Errors " + str(stats.errors)
+ + " Skipped " + str(stats.skipped)
+ + " Duplicates " + str(stats.duplicates)
+ + " Tasks " + str(stats.tasks)
+ " Converted " + str(stats.converted)
+ " Copied " + str(stats.copied)
+ " Updated " + str(stats.updated)
- + " Removed " + str(stats.removed)
- + " Duplicates " + str(stats.duplicates)
- + " Skipped " + str(stats.skipped) + ".")
+ + " Removed " + str(stats.removed) + ".")
# Main function started by threads. Process queue elements
# until 'None' is found or stop signal is received.
def _processQueue(self, q: queue.SimpleQueue,
converterPath: str, stats: Stats) -> None:
+ self._statsLock.acquire()
+ totalTasks: int = stats.tasks
+ self._statsLock.release()
+
localStats: Stats = Stats()
# Process elements until getting 'None'.
while True:
- queueElement: Task = q.get()
+ task: Task = q.get()
- if queueElement == None or self._getStopSignal() == True:
+ if task == None or self._getStopSignal() == True:
break
- self._processTask(queueElement, converterPath, localStats)
+ self._processTask(task, converterPath, totalTasks, localStats)
#q.task_done()
# Update global stats.
self._statsLock.release()
# Copy or convert a music file.
- def _processTask(self, task: Task, converterPath: str,
+ def _processTask(self, task: Task, converterPath: str, totalTasks: int,
stats: Stats) -> None:
if not os.path.isfile(task.inputFilePath):
self._log("Error: Playlist element '" + task.inputFilePath
return
if task.taskType == "copy":
- self._log("Copying '" + task.inputFilePath
+ self._log(str(task.taskId) + "/" + str(totalTasks) + ": Copying '" + task.inputFilePath
+ "' into '" + task.outputFilePath + "'.")
self.copyFile(task.inputFilePath, outputFilePathWip)
os.rename(outputFilePathWip, task.outputFilePath)
command: list[str] = ([converterPath] + [self._CONVERTER_IN]
+ [task.inputFilePath] + task.converterArgs
+ [outputFilePathWip])
- self._log("Converting '" + task.inputFilePath + "' to '"
+ self._log(str(task.taskId) + "/" + str(totalTasks) + ": Converting '" + task.inputFilePath + "' to '"
+ task.outputFilePath + "'.")
result: subprocess.CompletedProcess = subprocess.run(command, capture_output=True, text=True)