--- /dev/null
+#!/usr/bin/env python
+
+'''
+BSD 3 License
+Copyright 2024 Julien Selamme
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
+from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+'''
+
+import argparse
+import filecmp # diff
+import os
+import queue
+import shutil # copy2
+import signal
+import subprocess
+import sys
+import threading
+import typing
+import urllib.parse # Decode XSPF URL formatted lines
+import xml.etree.ElementTree # Read XSPF files
+
+# GUI
+import tkinter as tk
+import tkinter.filedialog as tkfd
+import tkinter.scrolledtext as tkst
+
+# As of now only ffmpeg converter is supported.
+CONVERTER_NAME: str = "ffmpeg"
+
+class LibraryTrack:
+ def __init__(self, trackDir: str, trackFile: str) -> None:
+ self.trackDir: str = trackDir # Absolute path.
+ self.trackFile: str = trackFile # With extension.
+
+class LibraryPlaylist:
+ def __init__(self, name: str) -> None:
+ self.name: str = name
+ self.tracks: list[LibraryTrack] = []
+
+ def add(self, trackDir: str, trackFile: str) -> None:
+ self.tracks.append(LibraryTrack(trackDir, trackFile))
+
+class Task:
+ def __init__(self, taskType: str, converterArgs: list[str],
+ inputFilePath: str, outputFilePath: str) -> None:
+ self.taskType: str = taskType # Convert or copy.
+ self.converterArgs: list[str] = converterArgs
+ self.inputFilePath: str = inputFilePath
+ self.outputFilePath: str = outputFilePath
+
+class ConvertParams:
+ def __init__(self, playlistDir: str, targetDir: str,
+ convertProfile: str, converterPath: str,
+ trimNTrack: bool, jobs: int) -> None:
+ self.playlistDir: str = playlistDir
+ self.targetDir: str = targetDir
+ self.convertProfile: str = convertProfile
+ self.converterPath: str = converterPath
+ self.trimNTrack: bool = trimNTrack
+ self.jobs: int = jobs
+
+class ConvertProfile:
+ def __init__(self) -> None:
+ self.name: str = "none"
+ self.targetExtension: str = ""
+ self.convertExtensions: list[str] = list()
+ self.copyExtensions: list[str] = list()
+ self.converterArgs: list[str] = list()
+
+ def init(self, profileName: str) -> bool:
+ # -n to not overwrite any existing file.
+ # -ar 44100 for 44.1 kHz audio output.
+ # -fps_mode vfr to drop frames which cannot be
+ # properly contained in the output file.
+ commonFlags: list[str] = ["-hide_banner", "-loglevel", "error",
+ "-n", "-ar", "44100", "-fps_mode", "vfr"]
+
+ # -sample_fmt s16 for 16 bits audio output.
+ # -write_id3v2 1 to write metadata in aiff files.
+ if profileName == "aiff":
+ self.name = "aiff"
+ self.targetExtension = ".aiff"
+ self.convertExtensions = [".aiff", ".flac", ".wav"]
+ self.copyExtensions = [".mp3"]
+ self.converterArgs = (commonFlags
+ + ["-f", "aiff", "-sample_fmt", "s16",
+ "-write_id3v2", "1"])
+ return True
+ elif profileName == "flac":
+ self.name = "flac"
+ self.targetExtension = ".flac"
+ self.convertExtensions = [".aiff", ".flac", ".wav"]
+ self.copyExtensions = [".mp3"]
+ self.converterArgs = (commonFlags
+ + ["-f", "flac", "-sample_fmt", "s16"])
+ return True
+ # -q:a 0 to set mp3 variable bitrate.
+ elif profileName == "mp3":
+ self.name = "mp3"
+ self.targetExtension = ".mp3"
+ self.convertExtensions = [".aiff", ".flac", ".wav"]
+ self.copyExtensions = [".mp3"]
+ self.converterArgs = (commonFlags
+ + ["-f", "mp3",
+ "-c:a", "libmp3lame", "-q:a", "0"])
+ return True
+ else:
+ return False
+
+ def isTargetExtension(self, extension: str) -> bool:
+ return extension == self.targetExtension
+
+ def isConvertExtension(self, extension: str) -> bool:
+ return extension in self.convertExtensions
+
+ def isCopyExtension(self, extension: str) -> bool:
+ return extension in self.copyExtensions
+
+class Stats:
+ def __init__(self) -> None:
+ self.errors: int = 0
+ self.converted: int = 0
+ self.copied: int = 0
+ self.removed: int = 0
+ self.updated: int = 0
+ self.duplicates: int = 0
+ self.skipped: int = 0
+
+ def add(self, stats) -> None:
+ self.errors += stats.errors
+ self.converted += stats.converted
+ self.copied += stats.copied
+ self.removed += stats.removed
+ self.updated += stats.updated
+ self.duplicates += stats.duplicates
+ self.skipped += stats.skipped
+
+# Main converter class. Thread safe.
+class Converter:
+ def __init__(self, outstreamFunction: typing.Callable[[str], None]) -> None:
+ self._running: bool = False
+ self._stopSignal: bool = False
+ self._runLock: threading.Lock = threading.Lock()
+ self._outstreamFunction: typing.Callable[[str], None] = outstreamFunction
+ self._logLock: threading.Lock = threading.Lock()
+ self._statsLock: threading.Lock = threading.Lock()
+
+ self._TARGET_PLAYLIST_EXT: str = ".m3u"
+ # Used for temporary files made from copy and conversion.
+ self._TMP_EXT: str = ".tmp"
+ # As of now only ffmpeg converter is supported.
+ self._CONVERTER_CHECK: str = "-version"
+ self._CONVERTER_IN: str = "-i"
+
+ # Check for unused albums.
+ def check(self, playlistDir: str, libraryDir: str) -> None:
+ if not self._start():
+ return
+
+ self._check(playlistDir, libraryDir)
+ self._end()
+
+ def _check(self, playlistDir: str, libraryDir: str) -> None:
+ if not os.path.isdir(playlistDir):
+ self._log("Error: Invalid playlist directory '"
+ + playlistDir + "'.")
+ return
+
+ if not os.path.isdir(libraryDir):
+ self._log("Error: Invalid library directory '"
+ + libraryDir + "'.")
+ return
+
+ # List of albums/directories from library directory.
+ # Absolute paths are used so each album is considered unique.
+ albums: set[str] = set()
+ # Key is album/directory (absolute path).
+ # Value is how many times a track from this album is used.
+ presenceMap: dict[str, int] = {}
+
+ # Initialize album list.
+ for r, d, f in os.walk(libraryDir):
+ if self._getStopSignal() == True:
+ return
+
+ if len(d) == 0: # No sub directory means it's an album directory.
+ albums.add(os.path.abspath(r))
+
+ for playlistFile in os.listdir(playlistDir):
+ if self._getStopSignal() == True:
+ return
+
+ libPlaylist = self._createLibraryPlaylistFromFile(os.path.join(playlistDir,
+ playlistFile))
+ for track in libPlaylist.tracks:
+ trackFilePath = os.path.join(track.trackDir,
+ track.trackFile)
+
+ if not os.path.isfile(trackFilePath):
+ self._log("Warning: '" + trackFilePath
+ + "' in playlist '" + libPlaylist.name + "' not found.")
+
+ # If an album contains a track from any playlist remove it.
+ if track.trackDir in albums:
+ albums.remove(track.trackDir)
+
+ # Print remaining unused albums.
+ if len(albums) > 0:
+ self._log("Unused albums:")
+
+ for album in sorted(albums):
+ self._log(album)
+
+ # Convert playlist content to target directory.
+ def convert(self, convertParams: ConvertParams) -> None:
+ if not self._start():
+ return
+
+ self._convert(convertParams)
+ self._end()
+
+ def _convert(self, convertParams: ConvertParams) -> None:
+ if not os.path.isdir(convertParams.playlistDir):
+ self._log("Error: Invalid playlist directory '"
+ + convertParams.playlistDir + "'.")
+ return
+
+ if not os.path.isdir(convertParams.targetDir):
+ self._log("Error: Invalid target directory '"
+ + convertParams.targetDir + "'.")
+ return
+
+ convertProfile: ConvertProfile = ConvertProfile()
+
+ if not convertProfile.init(convertParams.convertProfile):
+ self._log("Error: Invalid convert profile '"
+ + convertParams.convertProfile + "'.")
+ return
+
+ if not self.checkConverter(convertParams.converterPath,
+ self._CONVERTER_CHECK):
+ self._log("Error: Could not start converter '"
+ + convertParams.converterPath + "'.")
+ return
+
+ if convertParams.jobs < 1:
+ self._log("Error: Number of concurrent jobs must be at least 1.")
+ return
+
+ # Playlists generated from playlist files.
+ libPlaylists: dict[str, LibraryPlaylist] = self._createLibraryPlaylists(convertParams.playlistDir)
+
+ if len(libPlaylists) == 0:
+ self._log("No playlist found.")
+ return
+
+ # Existing files in target directory to be removed if not found in playlists.
+ unmatchedFiles: set[str] = set()
+ # Processed files. To handle duplicates (same file from multiple playlists).
+ processedFiles: set[str] = set()
+ # Existing playlists in target directory to be removed
+ # if not found in original playlist directory.
+ unmatchedPlaylistFiles: set[str] = set()
+ # Total playlist track count.
+ count: int = 0
+ stats: Stats = Stats()
+ # Task queue.
+ q: queue.SimpleQueue = queue.SimpleQueue()
+
+ # Build existing file sets from target directory.
+ for tFile in os.listdir(convertParams.targetDir):
+ if not os.path.isfile(os.path.join(convertParams.targetDir,
+ tFile)):
+ continue
+
+ tExt: str = self.getFileExtension(tFile).lower()
+
+ if (convertProfile.isTargetExtension(tExt)
+ or convertProfile.isCopyExtension(tExt)):
+ unmatchedFiles.add(tFile)
+ elif tExt == self._TARGET_PLAYLIST_EXT:
+ unmatchedPlaylistFiles.add(tFile)
+ # else ? File tFile not handled.
+
+ # Build task queue and update playlist files.
+ for p in libPlaylists:
+ if self._getStopSignal() == True:
+ self._log("Stopped.")
+ return
+
+ playlist: LibraryPlaylist = libPlaylists[p]
+ targetPlaylistFile: str = (playlist.name
+ + self._TARGET_PLAYLIST_EXT)
+ targetFileList: list[str] = []
+
+ if targetPlaylistFile in unmatchedPlaylistFiles:
+ unmatchedPlaylistFiles.remove(targetPlaylistFile)
+
+ for track in playlist.tracks:
+ trackFilePath: str = os.path.join(track.trackDir,
+ track.trackFile)
+
+ if not os.path.isfile(trackFilePath):
+ self._log("Error: '" + trackFilePath + "' in playlist '"
+ + playlist.name + "' is not a file.")
+ stats.errors += 1
+ continue
+
+ # Default task type: copy.
+ taskType: str = "copy"
+ targetName: str = self.trimFileExtension(track.trackFile)
+ targetExt: str = self.getFileExtension(track.trackFile).lower()
+
+ # Convert.
+ if convertProfile.isConvertExtension(targetExt):
+ targetExt = convertProfile.targetExtension
+ taskType = "convert"
+ # Do nothing if extension not found in profile.
+ elif not convertProfile.isCopyExtension(targetExt):
+ self._log("Error: Unsupported track file '"
+ + trackFilePath + "' in playlist '"
+ + playlist.name + "' for profile '"
+ + convertProfile.name + "'.")
+ stats.errors += 1
+ continue
+
+ targetFileName: str = targetName
+
+ # Replace m3u comment character.
+ targetFileName = targetFileName.replace('#', '_')
+
+ if convertParams.trimNTrack:
+ targetFileName = self.trimTrackFileName(targetFileName)
+
+ targetFile: str = targetFileName + targetExt
+ targetFilePath: str = os.path.join(convertParams.targetDir,
+ targetFile)
+
+ targetFileList.append(targetFile)
+
+ # Target already exists.
+ if targetFile in unmatchedFiles:
+ unmatchedFiles.remove(targetFile)
+
+ if not os.path.isfile(targetFilePath):
+ self._log("Error: Existing target element '"
+ + targetFilePath + "' is not a valid file.")
+ stats.errors += 1
+ elif self.isFileChanged(trackFilePath, targetFilePath):
+ processedFiles.add(targetFile)
+ q.put(Task(taskType, convertProfile.converterArgs,
+ trackFilePath, targetFilePath))
+ else:
+ processedFiles.add(targetFile)
+ stats.skipped += 1
+ # Target already processed.
+ elif targetFile in processedFiles:
+ stats.duplicates +=1
+ # Target does not exist.
+ else:
+ processedFiles.add(targetFile)
+ q.put(Task(taskType, convertProfile.converterArgs,
+ trackFilePath, targetFilePath))
+ # /for track in playlist.tracks
+
+ count += len(playlist.tracks)
+
+ # Replace existing playlist file.
+ self._updatePlaylistFile(targetFileList,
+ os.path.join(convertParams.targetDir, targetPlaylistFile))
+ # /for playlist in libPlaylists
+
+ if self._getStopSignal() == True:
+ self._log("Stopped.")
+ return
+
+ # Delete non matched playlists.
+ for f in unmatchedPlaylistFiles:
+ self._log("Removing non matched playlist '" + f + "'.")
+ os.remove(os.path.join(convertParams.targetDir, f))
+
+ # Delete non matched tracks.
+ for f in unmatchedFiles:
+ self._log("Removing non matched file '" + f + "'.")
+ os.remove(os.path.join(convertParams.targetDir, f))
+ stats.removed += 1
+
+ threads: list[threading.Thread] = []
+
+ for i in range(0, convertParams.jobs - 1):
+ # Create worker thread.
+ threads.append(threading.Thread(target=self._processQueue,
+ args=(q, convertParams.converterPath, stats)))
+
+ # A thread stops when it gets None from queue.
+ q.put(None)
+
+ threads[i].start()
+
+ # Join workers.
+ q.put(None)
+ self._processQueue(q, convertParams.converterPath, stats)
+
+ # Wait for workers.
+ for i in range(0, len(threads)):
+ threads[i].join()
+
+ self._log("From " + str(count) + " elements in "
+ + str(len(libPlaylists)) + " playlists:"
+ + " Errors " + str(stats.errors)
+ + " Converted " + str(stats.converted)
+ + " Copied " + str(stats.copied)
+ + " Updated " + str(stats.updated)
+ + " Removed " + str(stats.removed)
+ + " Duplicates " + str(stats.duplicates)
+ + " Skipped " + str(stats.skipped) + ".")
+
+ # Main function started by threads. Process queue elements
+ # until 'None' is found or stop signal is received.
+ def _processQueue(self, q: queue.SimpleQueue,
+ converterPath: str, stats: Stats) -> None:
+ localStats: Stats = Stats()
+
+ # Process elements until getting 'None'.
+ while True:
+ queueElement: Task = q.get()
+
+ if queueElement == None or self._getStopSignal() == True:
+ break
+
+ self._processTask(queueElement, converterPath, localStats)
+ #q.task_done()
+
+ # Update global stats.
+ self._statsLock.acquire()
+ stats.add(localStats)
+ self._statsLock.release()
+
+ # Copy or convert a music file.
+ def _processTask(self, task: Task, converterPath: str,
+ stats: Stats) -> None:
+ if not os.path.isfile(task.inputFilePath):
+ self._log("Error: Playlist element '" + task.inputFilePath
+ + "' is not a file.")
+ stats.errors += 1
+ return
+
+ update: bool = os.path.isfile(task.outputFilePath)
+ outputFilePathWip: str = task.outputFilePath + self._TMP_EXT
+
+ for f in [task.outputFilePath, outputFilePathWip]:
+ if os.path.isfile(f):
+ self._log("Removing outdated file '" + f + "'.")
+ os.remove(f)
+ elif os.path.exists(f):
+ self._log("Error: Existing non file element'" + f + "'.")
+ stats.errors += 1
+ return
+
+ if task.taskType == "copy":
+ self._log("Copying '" + task.inputFilePath
+ + "' into '" + task.outputFilePath + "'.")
+ self.copyFile(task.inputFilePath, outputFilePathWip)
+ os.rename(outputFilePathWip, task.outputFilePath)
+ stats.copied += 1
+
+ if update:
+ stats.updated +=1
+ elif task.taskType == "convert":
+ command: list[str] = ([converterPath] + [self._CONVERTER_IN]
+ + [task.inputFilePath] + task.converterArgs
+ + [outputFilePathWip])
+ self._log("Converting '" + task.inputFilePath + "' to '"
+ + task.outputFilePath + "'.")
+ result: subprocess.CompletedProcess = subprocess.run(command, capture_output=True, text=True)
+
+ if result.returncode == 0:
+ os.rename(outputFilePathWip, task.outputFilePath)
+ stats.converted += 1
+
+ if update:
+ stats.updated +=1
+ else:
+ self._log("Error from converter: " + result.stderr)
+ stats.errors += 1
+ else: # Unknown task
+ self._log("Error: Skipping non valid task type '" + task.taskType
+ + "' file '" + task.inputFilePath + "'.")
+ stats.errors += 1
+
+ # Create library playlists from a base directory.
+ def _createLibraryPlaylists(self, directory: str) -> dict[str, LibraryPlaylist]:
+ libPlaylists: dict[str, LibraryPlaylist] = {}
+
+ if not os.path.isdir(directory):
+ self._log("Warning: '" + directory + "' is not a valid directory.")
+ else:
+ for element in os.listdir(directory):
+ path: str = os.path.join(directory, element)
+
+ if not os.path.isfile(path):
+ continue
+
+ playlistName: str = self.trimFileExtension(element)
+
+ if playlistName in libPlaylists:
+ self._log("Warning: Duplicate playlist name '"
+ + playlistName + "'.")
+ else:
+ libPlaylist: LibraryPlaylist = self._createLibraryPlaylistFromFile(path)
+
+ if len(libPlaylist.tracks) == 0:
+ self._log("Warning: Empty library playlist '"
+ + libPlaylist.name + "'.")
+ else:
+ libPlaylists[playlistName] = libPlaylist
+
+ return libPlaylists
+
+ # Create a library playlist from a playlist file.
+ def _createLibraryPlaylistFromFile(self, filePath: str) -> LibraryPlaylist:
+ playlist: LibraryPlaylist = LibraryPlaylist(self.trimFileExtension(os.path.basename(filePath)))
+
+ if (self.getFileExtension(filePath).lower() == ".m3u"
+ or self.getFileExtension(filePath).lower() == ".m3u8"):
+ self._loadTracksFromM3u(filePath, playlist)
+ elif self.getFileExtension(filePath).lower() == ".xspf":
+ self._loadTracksFromXspf(filePath, playlist)
+ else:
+ self._log("Warning: Unsupported playlist extension for file '"
+ + filePath + "'.")
+
+ return playlist
+
+ # Read m3u playlist file to get all its tracks.
+ def _loadTracksFromM3u(self, filePath: str, playlist: LibraryPlaylist) -> None:
+ m3uFile: typing.IO[str] = self.openFile(filePath, "rt")
+
+ # Each line is an string path.
+ for line in m3uFile:
+ string: str = line.rstrip()
+
+ # Starting with '#' means it's a comment.
+ if len(string) == 0 or string[0] == "#":
+ continue
+
+ playlist.add(os.path.dirname(string), os.path.basename(string))
+
+ # Read xspf playlist file to get all its tracks.
+ def _loadTracksFromXspf(self, filePath: str, playlist: LibraryPlaylist) -> None:
+ xspf: xml.etree.ElementTree.Element = xml.etree.ElementTree.parse(filePath).getroot()
+ xmlns: str = "http://xspf.org/ns/0/"
+
+ # Each element is a string path.
+ for track in xspf[0]: # <track> in <tracks>
+ element: typing.Optional[xml.etree.ElementTree.Element] = track.find("{" + xmlns + "}location")
+
+ if element is None:
+ continue
+
+ urlDecodedString: str = urllib.parse.unquote(str(element.text))
+ playlist.add(os.path.dirname(urlDecodedString),
+ os.path.basename(urlDecodedString))
+
+ # Create or update playlist file from a directory.
+ def _updatePlaylistFile(self, playlist: list[str], playlistPath: str) -> None:
+ oriPlFilePath: str = playlistPath
+ repPlFilePath: str = oriPlFilePath + self._TMP_EXT
+
+ self._buildM3uPlaylistFile(playlist, repPlFilePath)
+
+ # Replace old playlist file if content is different.
+ if os.path.isfile(oriPlFilePath):
+ # Do nothing if new playlist file is identical to the previous one.
+ if self.isFileIdentical(oriPlFilePath, repPlFilePath):
+ os.remove(repPlFilePath)
+ return
+
+ self._log("Updating outdated playlist file '" + oriPlFilePath + "'.")
+ os.remove(oriPlFilePath)
+ else:
+ self._log("Adding playlist file '" + oriPlFilePath + "'.")
+
+ os.rename(repPlFilePath, oriPlFilePath)
+
+ # Build a m3u file from a playlist.
+ def _buildM3uPlaylistFile(self, playlist: list[str], filePath: str) -> None:
+ plFile: typing.IO[str] = self.openFile(filePath, "wt")
+
+ # Build new playlist file.
+ for track in playlist:
+ plFile.write(track + "\n")
+
+ plFile.close()
+
+ # Prevent the same converter from running twice.
+ def _start(self) -> bool:
+ result: bool = False
+ self._runLock.acquire()
+
+ if self._running:
+ self._log("Error: This conveter is already running.")
+ else:
+ self._running = True
+ self._stopSignal = False
+ result = True
+
+ self._runLock.release()
+ return result
+
+ def _end(self) -> None:
+ self._runLock.acquire()
+ self._running = False
+ self._runLock.release()
+
+ # Called from external thread.
+ def stop(self) -> None:
+ self._log("Received stop signal. Stopping ongoing operations...")
+
+ self._runLock.acquire()
+ self._stopSignal = True
+ self._runLock.release()
+
+ def _getStopSignal(self) -> bool:
+ self._runLock.acquire()
+ result = self._stopSignal
+ self._runLock.release()
+ return result
+
+ def _log(self, message: str) -> None:
+ self._logLock.acquire()
+ self._outstreamFunction(message)
+ self._logLock.release()
+
+ # Trim file name to remove leader track number.
+ def trimTrackFileName(self, trackFileName: str) -> str:
+ n: int = 0
+
+ # Go past leading numbers.
+ for e in trackFileName:
+ if e.isdigit():
+ n = n + 1
+ else:
+ break
+
+ # NN - NAME, NN . NAME
+ if (len(trackFileName) > n + 3
+ and trackFileName[n:n+3] in [" - ", " . "]):
+ return trackFileName[n+3:]
+ # NN- NAME, NN -NAME, NN. NAME, NN .NAME
+ elif (len(trackFileName) > n + 2
+ and trackFileName[n:n+2] in ["- ", " -", ". ", " ."]):
+ return trackFileName[n+2:]
+ # NN-NAME, NN.NAME, NN NAME
+ elif (len(trackFileName) > n + 1
+ and trackFileName[n:n+1] in ["-", ".", " "]):
+ return trackFileName[n+1:]
+ else:
+ return trackFileName
+
+ def trimFileExtension(self, filePath: str) -> str:
+ return os.path.splitext(filePath)[0]
+
+ def getFileExtension(self, filePath: str) -> str:
+ return os.path.splitext(filePath)[1]
+
+ def openFile(self, path: str, args: str) -> typing.IO[str]:
+ return open(path, args, encoding="utf8")
+
+ def makeDir(self, dirPath: str) -> None:
+ os.makedirs(dirPath, exist_ok=True)
+
+ def copyFile(self, oriFilePath: str, newFilePath: str) -> None:
+ shutil.copy2(oriFilePath, newFilePath)
+
+ # Compare last modification date.
+ def isFileChanged(self, sourceFilePath: str,
+ targetFilePath: str) -> bool:
+ return (os.path.getmtime(sourceFilePath)
+ > os.path.getmtime(targetFilePath))
+
+ def isFileIdentical(self, firstFilePath: str,
+ secondFilePath: str) -> bool:
+ return filecmp.cmp(firstFilePath, secondFilePath,
+ shallow=False)
+
+ def checkConverter(self, converterPath: str,
+ converterArg: str) -> bool:
+ try:
+ return subprocess.run([converterPath, converterArg],
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL).returncode == 0
+ except:
+ return False;
+# /Converter
+
+class GUI:
+ def __init__(self) -> None:
+ self.converter: Converter = Converter(self.log)
+
+ self.rootWindow: tk.Tk = tk.Tk()
+ self.rootWindow.title("Music Library Converter")
+
+ self.pickGrid: tk.Frame = tk.Frame(self.rootWindow)
+
+ self.plDirLabel: tk.Label = tk.Label(self.pickGrid, text="Playlists directory")
+ self.plDirEntry: tk.Entry = tk.Entry(self.pickGrid)
+ self.plDirButton: tk.Button = tk.Button(self.pickGrid,
+ command=lambda: self.setDirEntryFromPicker(self.plDirEntry), text="Browse")
+
+ self.targetDirLabel: tk.Label = tk.Label(self.pickGrid, text="Target directory")
+ self.targetDirEntry: tk.Entry = tk.Entry(self.pickGrid)
+ self.targetDirButton: tk.Button = tk.Button(self.pickGrid,
+ command=lambda: self.setDirEntryFromPicker(self.targetDirEntry), text="Browse")
+
+ self.converterPathLabel: tk.Label = tk.Label(self.pickGrid, text="Converter path")
+ self.converterPathEntry: tk.Entry = tk.Entry(self.pickGrid)
+ self.converterPathEntry.insert(tk.INSERT, CONVERTER_NAME)
+ self.converterPathButton: tk.Button = tk.Button(self.pickGrid,
+ command=lambda: self.setFileEntryFromPicker(self.converterPathEntry), text="Browse")
+
+ self.runFrame: tk.Frame = tk.Frame(self.rootWindow)
+
+ self.runButton: tk.Button = tk.Button(self.runFrame,
+ command=self.run, text="Run Configuration")
+
+ self.cancelButton: tk.Button = tk.Button(self.runFrame,
+ command=self.cancel, text="Cancel")
+ self.cancelButton["state"] = "disabled"
+
+ self.actionLabel: tk.Label = tk.Label(self.runFrame,
+ text="Action")
+ self.action: tk.StringVar = tk.StringVar(self.runFrame)
+ self.action.set("convert")
+ self.actionOption: tk.OptionMenu = tk.OptionMenu(self.runFrame,
+ self.action, "convert", "check")
+ self.actionOption.config(width=8)
+
+ self.profileLabel: tk.Label = tk.Label(self.runFrame,
+ text="Profile")
+ self.profile: tk.StringVar = tk.StringVar(self.runFrame)
+ self.profile.set("mp3")
+ self.profileOption: tk.OptionMenu = tk.OptionMenu(self.runFrame,
+ self.profile, "mp3", "aiff", "flac")
+ self.profileOption.config(width=5)
+
+ self.trim: tk.IntVar = tk.IntVar(value=1)
+ self.trimCheck: tk.Checkbutton = tk.Checkbutton(self.runFrame,
+ text="Trim track N", variable=self.trim)
+
+ self.jobsLabel: tk.Label = tk.Label(self.runFrame,
+ text="Jobs")
+ self.jobs: tk.StringVar = tk.StringVar(value="1")
+ self.jobsOption: tk.OptionMenu = tk.OptionMenu(self.runFrame,
+ self.jobs, "1", "2", "3", "4", "5", "6", "7", "8", "9", "10")
+
+ self.logOutput: tkst.ScrolledText = tkst.ScrolledText(self.rootWindow)
+ self.logOutput.configure(state="disabled")
+ self.logLock: threading.Lock = threading.Lock()
+
+ def init(self) -> None:
+ self.rootWindow.grid_columnconfigure(0, weight=1)
+ self.rootWindow.grid_rowconfigure(2, weight=1)
+
+ self.pickGrid.grid(row=0, pady=5, sticky=tk.N+tk.E+tk.S+tk.W)
+ self.pickGrid.grid_columnconfigure(2, weight=1)
+
+ self.plDirLabel.grid(row=0, column=0, padx=5, sticky=tk.W)
+ self.plDirButton.grid(row=0, column=1, padx=5, sticky=tk.W)
+ self.plDirEntry.grid(row=0, column=2, padx=5, sticky=tk.E+tk.W)
+ self.targetDirLabel.grid(row=1, column=0, padx=5, sticky=tk.W)
+ self.targetDirButton.grid(row=1, column=1, padx=5, sticky=tk.W)
+ self.targetDirEntry.grid(row=1, column=2, padx=5, sticky=tk.E+tk.W)
+ self.converterPathLabel.grid(row=2, column=0, padx=5, sticky=tk.W)
+ self.converterPathButton.grid(row=2, column=1, padx=5, sticky=tk.W)
+ self.converterPathEntry.grid(row=2, column=2, padx=5, sticky=tk.E+tk.W)
+
+ self.runFrame.grid(row=1, sticky=tk.N+tk.S+tk.W)
+
+ self.runButton.pack(side=tk.LEFT, padx=(5, 5))
+ self.cancelButton.pack(side=tk.LEFT, padx=(0, 5))
+ self.actionLabel.pack(side=tk.LEFT, padx=(0, 5))
+ self.actionOption.pack(side=tk.LEFT, padx=(0, 5))
+ self.profileLabel.pack(side=tk.LEFT, padx=(0, 5))
+ self.profileOption.pack(side=tk.LEFT, padx=(0, 5))
+ self.trimCheck.pack(side=tk.LEFT, padx=(0, 5))
+ self.jobsLabel.pack(side=tk.LEFT, padx=(0, 5))
+ self.jobsOption.pack(side=tk.LEFT, padx=(0, 5))
+
+ self.logOutput.grid(row=2, padx=5, pady=5, sticky=tk.N+tk.E+tk.S+tk.W)
+
+ self.rootWindow.mainloop()
+
+ def run(self) -> None:
+ self.plDirButton["state"] = "disabled"
+ self.plDirEntry["state"] = "disabled"
+ self.targetDirButton["state"] = "disabled"
+ self.targetDirEntry["state"] = "disabled"
+ self.converterPathButton["state"] = "disabled"
+ self.converterPathEntry["state"] = "disabled"
+ self.runButton["state"] = "disabled"
+ self.actionOption["state"] = "disabled"
+ self.profileOption["state"] = "disabled"
+ self.trimCheck["state"] = "disabled"
+ self.jobsOption["state"] = "disabled"
+ self.cancelButton["state"] = "normal"
+
+ # Clear log.
+ self.logOutput.configure(state="normal")
+ self.logOutput.delete("1.0", tk.END)
+ self.logOutput.configure(state="disabled")
+
+ thread: threading.Thread = threading.Thread(target=self.process)
+ thread.start()
+
+ def cancel(self) -> None:
+ self.cancelButton["state"] = "disabled"
+ self.converter.stop()
+
+ def process(self) -> None:
+ if self.action.get() == "check":
+ self.converter.check(self.plDirEntry.get(), self.targetDirEntry.get())
+ else:
+ self.converter.convert(
+ ConvertParams(self.plDirEntry.get(), self.targetDirEntry.get(),
+ self.profile.get(), self.converterPathEntry.get(),
+ bool(self.trim.get()), int(self.jobs.get())))
+
+ # Processing is done.
+ self.plDirButton["state"] = "normal"
+ self.plDirEntry["state"] = "normal"
+ self.targetDirButton["state"] = "normal"
+ self.targetDirEntry["state"] = "normal"
+ self.converterPathButton["state"] = "normal"
+ self.converterPathEntry["state"] = "normal"
+ self.runButton["state"] = "normal"
+ self.actionOption["state"] = "normal"
+ self.profileOption["state"] = "normal"
+ self.trimCheck["state"] = "normal"
+ self.jobsOption["state"] = "normal"
+ self.cancelButton["state"] = "disabled"
+
+ def log(self, message: str) -> None:
+ self.logLock.acquire()
+ self.logOutput.configure(state="normal")
+
+ if len(message) > 0 and message[-1] == "\n":
+ self.logOutput.insert(tk.END, message)
+ else:
+ self.logOutput.insert(tk.END, message + "\n")
+
+ #self.logOutput.see(tk.END) Causes segfault
+ self.logOutput.configure(state="disabled")
+ self.logLock.release()
+
+ def setDirEntryFromPicker(self, entry) -> None:
+ choice: str = tkfd.askdirectory()
+
+ if len(choice) != 0:
+ entry.delete(0, tk.END)
+ entry.insert(tk.INSERT, choice)
+
+ def setFileEntryFromPicker(self, entry) -> None:
+ choice: str = tkfd.askopenfilename()
+
+ if len(choice) != 0:
+ entry.delete(0, tk.END)
+ entry.insert(tk.INSERT, choice)
+# /GUI
+
+def cli_print_message(message: str) -> None:
+ if len(message) > 0 and message[-1] == "\n":
+ print(message, end="")
+ else:
+ print(message)
+
+def cli_main() -> None:
+ parser: argparse.ArgumentParser = argparse.ArgumentParser(
+ description="Music library converter",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+ parser.add_argument("playlist-dir", help="Playlist directory",
+ type=str)
+ parser.add_argument("target-dir", help="Target directory",
+ type=str)
+ parser.add_argument("--profile", "-p", help="Conversion profile",
+ type=str, default="mp3")
+ parser.add_argument("--converter", "-c", help="Converter path",
+ type=str, default=CONVERTER_NAME)
+ parser.add_argument("--trim", "-t", help="Trim track numbers in file names",
+ action="store_true")
+ parser.add_argument("--jobs", "-j", help="Number of concurrent jobs",
+ type=int, default=1)
+ parser.add_argument("--check", help="Check mode", action="store_true")
+
+ args: dict = vars(parser.parse_args())
+ converter: Converter = Converter(cli_print_message)
+
+ def cli_stop_signal(signal, frame):
+ converter.stop()
+
+ signal.signal(signal.SIGINT, cli_stop_signal)
+
+ if args["check"] == True:
+ converter.check(args["playlist-dir"], args["target-dir"])
+ else:
+ converter.convert(ConvertParams(args["playlist-dir"], args["target-dir"],
+ args["profile"], args["converter"],
+ args["trim"], args["jobs"]))
+
+def gui_main() -> None:
+ gui: GUI = GUI()
+ gui.init()
+
+if __name__ == "__main__":
+ if len(sys.argv) <= 1:
+ gui_main()
+ else:
+ cli_main()