From e20f50c83c7164ddf44041e28592f003ab16463d Mon Sep 17 00:00:00 2001 From: Julien Date: Sat, 10 Aug 2024 01:35:03 +0200 Subject: [PATCH] Initial commit on new server --- README.md | 54 +++ converter.py | 935 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 989 insertions(+) create mode 100644 README.md create mode 100755 converter.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..c08164b --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +# Music Library Converter + +Convert and copy music files and playlists to a single directory. + +This directory is to be used as DJ software or phone music library. + +**Python and ffmpeg are required to run this application.** + +**Input**: Playlist directory containing playlist files. + +**Output**: Target directory containing converted files with desired extension and associated playlist files. + +### Examples + +Using GUI: + +`convert.py` + +Using CLI: + +`convert.py C:\PlaylistDirectory C:\TargetConvertedDirectory` + +`convert.py --profile aiff --jobs 4 --trim --converter C:\ffmpeg\bin\ffmpeg.exe C:\PlaylistDirectory C:\TargetDirectory` + +`convert.py -p flac -j 8 -t -c C:\ffmpeg\bin\ffmpeg.exe C:\PlaylistDirectory C:\TargetDirectory` + +### Supported library playlist extensions + +- **m3u** +- **xspf** + +### Supported extension profiles + +- **mp3** (default) +- **aiff** +- **flac** + +### Supported target playlist extensions + +- **m3u** + +### Options + +- **converter**: Path to music file converter. For example `C:\ffmpeg\bin\ffmpeg.exe`. +- **trim**: Trim track numbers from file names. +- **jobs**: Number of parallel tasks to run (Multithreading). + +Music files are only replaced if their library version is more recent than their target directory version. + +Target directory playlist files are only replaced if their content changed. + +All files are converted with an external transcoder. As of now the only supported one is **ffmpeg**: https://ffmpeg.org/ + +No data is ever written outside the target directory. Transcodes are only performed on lossless files, other files are simply copied. diff --git a/converter.py b/converter.py new file mode 100755 index 0000000..2b57f34 --- /dev/null +++ b/converter.py @@ -0,0 +1,935 @@ +#!/usr/bin/env python + +''' +BSD 3 License +Copyright 2024 Julien Selamme + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, +OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +''' + +import argparse +import filecmp # diff +import os +import queue +import shutil # copy2 +import signal +import subprocess +import sys +import threading +import typing +import urllib.parse # Decode XSPF URL formatted lines +import xml.etree.ElementTree # Read XSPF files + +# GUI +import tkinter as tk +import tkinter.filedialog as tkfd +import tkinter.scrolledtext as tkst + +# As of now only ffmpeg converter is supported. +CONVERTER_NAME: str = "ffmpeg" + +class LibraryTrack: + def __init__(self, trackDir: str, trackFile: str) -> None: + self.trackDir: str = trackDir # Absolute path. + self.trackFile: str = trackFile # With extension. + +class LibraryPlaylist: + def __init__(self, name: str) -> None: + self.name: str = name + self.tracks: list[LibraryTrack] = [] + + def add(self, trackDir: str, trackFile: str) -> None: + self.tracks.append(LibraryTrack(trackDir, trackFile)) + +class Task: + def __init__(self, taskType: str, converterArgs: list[str], + inputFilePath: str, outputFilePath: str) -> None: + self.taskType: str = taskType # Convert or copy. + self.converterArgs: list[str] = converterArgs + self.inputFilePath: str = inputFilePath + self.outputFilePath: str = outputFilePath + +class ConvertParams: + def __init__(self, playlistDir: str, targetDir: str, + convertProfile: str, converterPath: str, + trimNTrack: bool, jobs: int) -> None: + self.playlistDir: str = playlistDir + self.targetDir: str = targetDir + self.convertProfile: str = convertProfile + self.converterPath: str = converterPath + self.trimNTrack: bool = trimNTrack + self.jobs: int = jobs + +class ConvertProfile: + def __init__(self) -> None: + self.name: str = "none" + self.targetExtension: str = "" + self.convertExtensions: list[str] = list() + self.copyExtensions: list[str] = list() + self.converterArgs: list[str] = list() + + def init(self, profileName: str) -> bool: + # -n to not overwrite any existing file. + # -ar 44100 for 44.1 kHz audio output. + # -fps_mode vfr to drop frames which cannot be + # properly contained in the output file. + commonFlags: list[str] = ["-hide_banner", "-loglevel", "error", + "-n", "-ar", "44100", "-fps_mode", "vfr"] + + # -sample_fmt s16 for 16 bits audio output. + # -write_id3v2 1 to write metadata in aiff files. + if profileName == "aiff": + self.name = "aiff" + self.targetExtension = ".aiff" + self.convertExtensions = [".aiff", ".flac", ".wav"] + self.copyExtensions = [".mp3"] + self.converterArgs = (commonFlags + + ["-f", "aiff", "-sample_fmt", "s16", + "-write_id3v2", "1"]) + return True + elif profileName == "flac": + self.name = "flac" + self.targetExtension = ".flac" + self.convertExtensions = [".aiff", ".flac", ".wav"] + self.copyExtensions = [".mp3"] + self.converterArgs = (commonFlags + + ["-f", "flac", "-sample_fmt", "s16"]) + return True + # -q:a 0 to set mp3 variable bitrate. + elif profileName == "mp3": + self.name = "mp3" + self.targetExtension = ".mp3" + self.convertExtensions = [".aiff", ".flac", ".wav"] + self.copyExtensions = [".mp3"] + self.converterArgs = (commonFlags + + ["-f", "mp3", + "-c:a", "libmp3lame", "-q:a", "0"]) + return True + else: + return False + + def isTargetExtension(self, extension: str) -> bool: + return extension == self.targetExtension + + def isConvertExtension(self, extension: str) -> bool: + return extension in self.convertExtensions + + def isCopyExtension(self, extension: str) -> bool: + return extension in self.copyExtensions + +class Stats: + def __init__(self) -> None: + self.errors: int = 0 + self.converted: int = 0 + self.copied: int = 0 + self.removed: int = 0 + self.updated: int = 0 + self.duplicates: int = 0 + self.skipped: int = 0 + + def add(self, stats) -> None: + self.errors += stats.errors + self.converted += stats.converted + self.copied += stats.copied + self.removed += stats.removed + self.updated += stats.updated + self.duplicates += stats.duplicates + self.skipped += stats.skipped + +# Main converter class. Thread safe. +class Converter: + def __init__(self, outstreamFunction: typing.Callable[[str], None]) -> None: + self._running: bool = False + self._stopSignal: bool = False + self._runLock: threading.Lock = threading.Lock() + self._outstreamFunction: typing.Callable[[str], None] = outstreamFunction + self._logLock: threading.Lock = threading.Lock() + self._statsLock: threading.Lock = threading.Lock() + + self._TARGET_PLAYLIST_EXT: str = ".m3u" + # Used for temporary files made from copy and conversion. + self._TMP_EXT: str = ".tmp" + # As of now only ffmpeg converter is supported. + self._CONVERTER_CHECK: str = "-version" + self._CONVERTER_IN: str = "-i" + + # Check for unused albums. + def check(self, playlistDir: str, libraryDir: str) -> None: + if not self._start(): + return + + self._check(playlistDir, libraryDir) + self._end() + + def _check(self, playlistDir: str, libraryDir: str) -> None: + if not os.path.isdir(playlistDir): + self._log("Error: Invalid playlist directory '" + + playlistDir + "'.") + return + + if not os.path.isdir(libraryDir): + self._log("Error: Invalid library directory '" + + libraryDir + "'.") + return + + # List of albums/directories from library directory. + # Absolute paths are used so each album is considered unique. + albums: set[str] = set() + # Key is album/directory (absolute path). + # Value is how many times a track from this album is used. + presenceMap: dict[str, int] = {} + + # Initialize album list. + for r, d, f in os.walk(libraryDir): + if self._getStopSignal() == True: + return + + if len(d) == 0: # No sub directory means it's an album directory. + albums.add(os.path.abspath(r)) + + for playlistFile in os.listdir(playlistDir): + if self._getStopSignal() == True: + return + + libPlaylist = self._createLibraryPlaylistFromFile(os.path.join(playlistDir, + playlistFile)) + for track in libPlaylist.tracks: + trackFilePath = os.path.join(track.trackDir, + track.trackFile) + + if not os.path.isfile(trackFilePath): + self._log("Warning: '" + trackFilePath + + "' in playlist '" + libPlaylist.name + "' not found.") + + # If an album contains a track from any playlist remove it. + if track.trackDir in albums: + albums.remove(track.trackDir) + + # Print remaining unused albums. + if len(albums) > 0: + self._log("Unused albums:") + + for album in sorted(albums): + self._log(album) + + # Convert playlist content to target directory. + def convert(self, convertParams: ConvertParams) -> None: + if not self._start(): + return + + self._convert(convertParams) + self._end() + + def _convert(self, convertParams: ConvertParams) -> None: + if not os.path.isdir(convertParams.playlistDir): + self._log("Error: Invalid playlist directory '" + + convertParams.playlistDir + "'.") + return + + if not os.path.isdir(convertParams.targetDir): + self._log("Error: Invalid target directory '" + + convertParams.targetDir + "'.") + return + + convertProfile: ConvertProfile = ConvertProfile() + + if not convertProfile.init(convertParams.convertProfile): + self._log("Error: Invalid convert profile '" + + convertParams.convertProfile + "'.") + return + + if not self.checkConverter(convertParams.converterPath, + self._CONVERTER_CHECK): + self._log("Error: Could not start converter '" + + convertParams.converterPath + "'.") + return + + if convertParams.jobs < 1: + self._log("Error: Number of concurrent jobs must be at least 1.") + return + + # Playlists generated from playlist files. + libPlaylists: dict[str, LibraryPlaylist] = self._createLibraryPlaylists(convertParams.playlistDir) + + if len(libPlaylists) == 0: + self._log("No playlist found.") + return + + # Existing files in target directory to be removed if not found in playlists. + unmatchedFiles: set[str] = set() + # Processed files. To handle duplicates (same file from multiple playlists). + processedFiles: set[str] = set() + # Existing playlists in target directory to be removed + # if not found in original playlist directory. + unmatchedPlaylistFiles: set[str] = set() + # Total playlist track count. + count: int = 0 + stats: Stats = Stats() + # Task queue. + q: queue.SimpleQueue = queue.SimpleQueue() + + # Build existing file sets from target directory. + for tFile in os.listdir(convertParams.targetDir): + if not os.path.isfile(os.path.join(convertParams.targetDir, + tFile)): + continue + + tExt: str = self.getFileExtension(tFile).lower() + + if (convertProfile.isTargetExtension(tExt) + or convertProfile.isCopyExtension(tExt)): + unmatchedFiles.add(tFile) + elif tExt == self._TARGET_PLAYLIST_EXT: + unmatchedPlaylistFiles.add(tFile) + # else ? File tFile not handled. + + # Build task queue and update playlist files. + for p in libPlaylists: + if self._getStopSignal() == True: + self._log("Stopped.") + return + + playlist: LibraryPlaylist = libPlaylists[p] + targetPlaylistFile: str = (playlist.name + + self._TARGET_PLAYLIST_EXT) + targetFileList: list[str] = [] + + if targetPlaylistFile in unmatchedPlaylistFiles: + unmatchedPlaylistFiles.remove(targetPlaylistFile) + + for track in playlist.tracks: + trackFilePath: str = os.path.join(track.trackDir, + track.trackFile) + + if not os.path.isfile(trackFilePath): + self._log("Error: '" + trackFilePath + "' in playlist '" + + playlist.name + "' is not a file.") + stats.errors += 1 + continue + + # Default task type: copy. + taskType: str = "copy" + targetName: str = self.trimFileExtension(track.trackFile) + targetExt: str = self.getFileExtension(track.trackFile).lower() + + # Convert. + if convertProfile.isConvertExtension(targetExt): + targetExt = convertProfile.targetExtension + taskType = "convert" + # Do nothing if extension not found in profile. + elif not convertProfile.isCopyExtension(targetExt): + self._log("Error: Unsupported track file '" + + trackFilePath + "' in playlist '" + + playlist.name + "' for profile '" + + convertProfile.name + "'.") + stats.errors += 1 + continue + + targetFileName: str = targetName + + # Replace m3u comment character. + targetFileName = targetFileName.replace('#', '_') + + if convertParams.trimNTrack: + targetFileName = self.trimTrackFileName(targetFileName) + + targetFile: str = targetFileName + targetExt + targetFilePath: str = os.path.join(convertParams.targetDir, + targetFile) + + targetFileList.append(targetFile) + + # Target already exists. + if targetFile in unmatchedFiles: + unmatchedFiles.remove(targetFile) + + if not os.path.isfile(targetFilePath): + self._log("Error: Existing target element '" + + targetFilePath + "' is not a valid file.") + stats.errors += 1 + elif self.isFileChanged(trackFilePath, targetFilePath): + processedFiles.add(targetFile) + q.put(Task(taskType, convertProfile.converterArgs, + trackFilePath, targetFilePath)) + else: + processedFiles.add(targetFile) + stats.skipped += 1 + # Target already processed. + elif targetFile in processedFiles: + stats.duplicates +=1 + # Target does not exist. + else: + processedFiles.add(targetFile) + q.put(Task(taskType, convertProfile.converterArgs, + trackFilePath, targetFilePath)) + # /for track in playlist.tracks + + count += len(playlist.tracks) + + # Replace existing playlist file. + self._updatePlaylistFile(targetFileList, + os.path.join(convertParams.targetDir, targetPlaylistFile)) + # /for playlist in libPlaylists + + if self._getStopSignal() == True: + self._log("Stopped.") + return + + # Delete non matched playlists. + for f in unmatchedPlaylistFiles: + self._log("Removing non matched playlist '" + f + "'.") + os.remove(os.path.join(convertParams.targetDir, f)) + + # Delete non matched tracks. + for f in unmatchedFiles: + self._log("Removing non matched file '" + f + "'.") + os.remove(os.path.join(convertParams.targetDir, f)) + stats.removed += 1 + + threads: list[threading.Thread] = [] + + for i in range(0, convertParams.jobs - 1): + # Create worker thread. + threads.append(threading.Thread(target=self._processQueue, + args=(q, convertParams.converterPath, stats))) + + # A thread stops when it gets None from queue. + q.put(None) + + threads[i].start() + + # Join workers. + q.put(None) + self._processQueue(q, convertParams.converterPath, stats) + + # Wait for workers. + for i in range(0, len(threads)): + threads[i].join() + + self._log("From " + str(count) + " elements in " + + str(len(libPlaylists)) + " playlists:" + + " Errors " + str(stats.errors) + + " Converted " + str(stats.converted) + + " Copied " + str(stats.copied) + + " Updated " + str(stats.updated) + + " Removed " + str(stats.removed) + + " Duplicates " + str(stats.duplicates) + + " Skipped " + str(stats.skipped) + ".") + + # Main function started by threads. Process queue elements + # until 'None' is found or stop signal is received. + def _processQueue(self, q: queue.SimpleQueue, + converterPath: str, stats: Stats) -> None: + localStats: Stats = Stats() + + # Process elements until getting 'None'. + while True: + queueElement: Task = q.get() + + if queueElement == None or self._getStopSignal() == True: + break + + self._processTask(queueElement, converterPath, localStats) + #q.task_done() + + # Update global stats. + self._statsLock.acquire() + stats.add(localStats) + self._statsLock.release() + + # Copy or convert a music file. + def _processTask(self, task: Task, converterPath: str, + stats: Stats) -> None: + if not os.path.isfile(task.inputFilePath): + self._log("Error: Playlist element '" + task.inputFilePath + + "' is not a file.") + stats.errors += 1 + return + + update: bool = os.path.isfile(task.outputFilePath) + outputFilePathWip: str = task.outputFilePath + self._TMP_EXT + + for f in [task.outputFilePath, outputFilePathWip]: + if os.path.isfile(f): + self._log("Removing outdated file '" + f + "'.") + os.remove(f) + elif os.path.exists(f): + self._log("Error: Existing non file element'" + f + "'.") + stats.errors += 1 + return + + if task.taskType == "copy": + self._log("Copying '" + task.inputFilePath + + "' into '" + task.outputFilePath + "'.") + self.copyFile(task.inputFilePath, outputFilePathWip) + os.rename(outputFilePathWip, task.outputFilePath) + stats.copied += 1 + + if update: + stats.updated +=1 + elif task.taskType == "convert": + command: list[str] = ([converterPath] + [self._CONVERTER_IN] + + [task.inputFilePath] + task.converterArgs + + [outputFilePathWip]) + self._log("Converting '" + task.inputFilePath + "' to '" + + task.outputFilePath + "'.") + result: subprocess.CompletedProcess = subprocess.run(command, capture_output=True, text=True) + + if result.returncode == 0: + os.rename(outputFilePathWip, task.outputFilePath) + stats.converted += 1 + + if update: + stats.updated +=1 + else: + self._log("Error from converter: " + result.stderr) + stats.errors += 1 + else: # Unknown task + self._log("Error: Skipping non valid task type '" + task.taskType + + "' file '" + task.inputFilePath + "'.") + stats.errors += 1 + + # Create library playlists from a base directory. + def _createLibraryPlaylists(self, directory: str) -> dict[str, LibraryPlaylist]: + libPlaylists: dict[str, LibraryPlaylist] = {} + + if not os.path.isdir(directory): + self._log("Warning: '" + directory + "' is not a valid directory.") + else: + for element in os.listdir(directory): + path: str = os.path.join(directory, element) + + if not os.path.isfile(path): + continue + + playlistName: str = self.trimFileExtension(element) + + if playlistName in libPlaylists: + self._log("Warning: Duplicate playlist name '" + + playlistName + "'.") + else: + libPlaylist: LibraryPlaylist = self._createLibraryPlaylistFromFile(path) + + if len(libPlaylist.tracks) == 0: + self._log("Warning: Empty library playlist '" + + libPlaylist.name + "'.") + else: + libPlaylists[playlistName] = libPlaylist + + return libPlaylists + + # Create a library playlist from a playlist file. + def _createLibraryPlaylistFromFile(self, filePath: str) -> LibraryPlaylist: + playlist: LibraryPlaylist = LibraryPlaylist(self.trimFileExtension(os.path.basename(filePath))) + + if (self.getFileExtension(filePath).lower() == ".m3u" + or self.getFileExtension(filePath).lower() == ".m3u8"): + self._loadTracksFromM3u(filePath, playlist) + elif self.getFileExtension(filePath).lower() == ".xspf": + self._loadTracksFromXspf(filePath, playlist) + else: + self._log("Warning: Unsupported playlist extension for file '" + + filePath + "'.") + + return playlist + + # Read m3u playlist file to get all its tracks. + def _loadTracksFromM3u(self, filePath: str, playlist: LibraryPlaylist) -> None: + m3uFile: typing.IO[str] = self.openFile(filePath, "rt") + + # Each line is an string path. + for line in m3uFile: + string: str = line.rstrip() + + # Starting with '#' means it's a comment. + if len(string) == 0 or string[0] == "#": + continue + + playlist.add(os.path.dirname(string), os.path.basename(string)) + + # Read xspf playlist file to get all its tracks. + def _loadTracksFromXspf(self, filePath: str, playlist: LibraryPlaylist) -> None: + xspf: xml.etree.ElementTree.Element = xml.etree.ElementTree.parse(filePath).getroot() + xmlns: str = "http://xspf.org/ns/0/" + + # Each element is a string path. + for track in xspf[0]: # in + element: typing.Optional[xml.etree.ElementTree.Element] = track.find("{" + xmlns + "}location") + + if element is None: + continue + + urlDecodedString: str = urllib.parse.unquote(str(element.text)) + playlist.add(os.path.dirname(urlDecodedString), + os.path.basename(urlDecodedString)) + + # Create or update playlist file from a directory. + def _updatePlaylistFile(self, playlist: list[str], playlistPath: str) -> None: + oriPlFilePath: str = playlistPath + repPlFilePath: str = oriPlFilePath + self._TMP_EXT + + self._buildM3uPlaylistFile(playlist, repPlFilePath) + + # Replace old playlist file if content is different. + if os.path.isfile(oriPlFilePath): + # Do nothing if new playlist file is identical to the previous one. + if self.isFileIdentical(oriPlFilePath, repPlFilePath): + os.remove(repPlFilePath) + return + + self._log("Updating outdated playlist file '" + oriPlFilePath + "'.") + os.remove(oriPlFilePath) + else: + self._log("Adding playlist file '" + oriPlFilePath + "'.") + + os.rename(repPlFilePath, oriPlFilePath) + + # Build a m3u file from a playlist. + def _buildM3uPlaylistFile(self, playlist: list[str], filePath: str) -> None: + plFile: typing.IO[str] = self.openFile(filePath, "wt") + + # Build new playlist file. + for track in playlist: + plFile.write(track + "\n") + + plFile.close() + + # Prevent the same converter from running twice. + def _start(self) -> bool: + result: bool = False + self._runLock.acquire() + + if self._running: + self._log("Error: This conveter is already running.") + else: + self._running = True + self._stopSignal = False + result = True + + self._runLock.release() + return result + + def _end(self) -> None: + self._runLock.acquire() + self._running = False + self._runLock.release() + + # Called from external thread. + def stop(self) -> None: + self._log("Received stop signal. Stopping ongoing operations...") + + self._runLock.acquire() + self._stopSignal = True + self._runLock.release() + + def _getStopSignal(self) -> bool: + self._runLock.acquire() + result = self._stopSignal + self._runLock.release() + return result + + def _log(self, message: str) -> None: + self._logLock.acquire() + self._outstreamFunction(message) + self._logLock.release() + + # Trim file name to remove leader track number. + def trimTrackFileName(self, trackFileName: str) -> str: + n: int = 0 + + # Go past leading numbers. + for e in trackFileName: + if e.isdigit(): + n = n + 1 + else: + break + + # NN - NAME, NN . NAME + if (len(trackFileName) > n + 3 + and trackFileName[n:n+3] in [" - ", " . "]): + return trackFileName[n+3:] + # NN- NAME, NN -NAME, NN. NAME, NN .NAME + elif (len(trackFileName) > n + 2 + and trackFileName[n:n+2] in ["- ", " -", ". ", " ."]): + return trackFileName[n+2:] + # NN-NAME, NN.NAME, NN NAME + elif (len(trackFileName) > n + 1 + and trackFileName[n:n+1] in ["-", ".", " "]): + return trackFileName[n+1:] + else: + return trackFileName + + def trimFileExtension(self, filePath: str) -> str: + return os.path.splitext(filePath)[0] + + def getFileExtension(self, filePath: str) -> str: + return os.path.splitext(filePath)[1] + + def openFile(self, path: str, args: str) -> typing.IO[str]: + return open(path, args, encoding="utf8") + + def makeDir(self, dirPath: str) -> None: + os.makedirs(dirPath, exist_ok=True) + + def copyFile(self, oriFilePath: str, newFilePath: str) -> None: + shutil.copy2(oriFilePath, newFilePath) + + # Compare last modification date. + def isFileChanged(self, sourceFilePath: str, + targetFilePath: str) -> bool: + return (os.path.getmtime(sourceFilePath) + > os.path.getmtime(targetFilePath)) + + def isFileIdentical(self, firstFilePath: str, + secondFilePath: str) -> bool: + return filecmp.cmp(firstFilePath, secondFilePath, + shallow=False) + + def checkConverter(self, converterPath: str, + converterArg: str) -> bool: + try: + return subprocess.run([converterPath, converterArg], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL).returncode == 0 + except: + return False; +# /Converter + +class GUI: + def __init__(self) -> None: + self.converter: Converter = Converter(self.log) + + self.rootWindow: tk.Tk = tk.Tk() + self.rootWindow.title("Music Library Converter") + + self.pickGrid: tk.Frame = tk.Frame(self.rootWindow) + + self.plDirLabel: tk.Label = tk.Label(self.pickGrid, text="Playlists directory") + self.plDirEntry: tk.Entry = tk.Entry(self.pickGrid) + self.plDirButton: tk.Button = tk.Button(self.pickGrid, + command=lambda: self.setDirEntryFromPicker(self.plDirEntry), text="Browse") + + self.targetDirLabel: tk.Label = tk.Label(self.pickGrid, text="Target directory") + self.targetDirEntry: tk.Entry = tk.Entry(self.pickGrid) + self.targetDirButton: tk.Button = tk.Button(self.pickGrid, + command=lambda: self.setDirEntryFromPicker(self.targetDirEntry), text="Browse") + + self.converterPathLabel: tk.Label = tk.Label(self.pickGrid, text="Converter path") + self.converterPathEntry: tk.Entry = tk.Entry(self.pickGrid) + self.converterPathEntry.insert(tk.INSERT, CONVERTER_NAME) + self.converterPathButton: tk.Button = tk.Button(self.pickGrid, + command=lambda: self.setFileEntryFromPicker(self.converterPathEntry), text="Browse") + + self.runFrame: tk.Frame = tk.Frame(self.rootWindow) + + self.runButton: tk.Button = tk.Button(self.runFrame, + command=self.run, text="Run Configuration") + + self.cancelButton: tk.Button = tk.Button(self.runFrame, + command=self.cancel, text="Cancel") + self.cancelButton["state"] = "disabled" + + self.actionLabel: tk.Label = tk.Label(self.runFrame, + text="Action") + self.action: tk.StringVar = tk.StringVar(self.runFrame) + self.action.set("convert") + self.actionOption: tk.OptionMenu = tk.OptionMenu(self.runFrame, + self.action, "convert", "check") + self.actionOption.config(width=8) + + self.profileLabel: tk.Label = tk.Label(self.runFrame, + text="Profile") + self.profile: tk.StringVar = tk.StringVar(self.runFrame) + self.profile.set("mp3") + self.profileOption: tk.OptionMenu = tk.OptionMenu(self.runFrame, + self.profile, "mp3", "aiff", "flac") + self.profileOption.config(width=5) + + self.trim: tk.IntVar = tk.IntVar(value=1) + self.trimCheck: tk.Checkbutton = tk.Checkbutton(self.runFrame, + text="Trim track N", variable=self.trim) + + self.jobsLabel: tk.Label = tk.Label(self.runFrame, + text="Jobs") + self.jobs: tk.StringVar = tk.StringVar(value="1") + self.jobsOption: tk.OptionMenu = tk.OptionMenu(self.runFrame, + self.jobs, "1", "2", "3", "4", "5", "6", "7", "8", "9", "10") + + self.logOutput: tkst.ScrolledText = tkst.ScrolledText(self.rootWindow) + self.logOutput.configure(state="disabled") + self.logLock: threading.Lock = threading.Lock() + + def init(self) -> None: + self.rootWindow.grid_columnconfigure(0, weight=1) + self.rootWindow.grid_rowconfigure(2, weight=1) + + self.pickGrid.grid(row=0, pady=5, sticky=tk.N+tk.E+tk.S+tk.W) + self.pickGrid.grid_columnconfigure(2, weight=1) + + self.plDirLabel.grid(row=0, column=0, padx=5, sticky=tk.W) + self.plDirButton.grid(row=0, column=1, padx=5, sticky=tk.W) + self.plDirEntry.grid(row=0, column=2, padx=5, sticky=tk.E+tk.W) + self.targetDirLabel.grid(row=1, column=0, padx=5, sticky=tk.W) + self.targetDirButton.grid(row=1, column=1, padx=5, sticky=tk.W) + self.targetDirEntry.grid(row=1, column=2, padx=5, sticky=tk.E+tk.W) + self.converterPathLabel.grid(row=2, column=0, padx=5, sticky=tk.W) + self.converterPathButton.grid(row=2, column=1, padx=5, sticky=tk.W) + self.converterPathEntry.grid(row=2, column=2, padx=5, sticky=tk.E+tk.W) + + self.runFrame.grid(row=1, sticky=tk.N+tk.S+tk.W) + + self.runButton.pack(side=tk.LEFT, padx=(5, 5)) + self.cancelButton.pack(side=tk.LEFT, padx=(0, 5)) + self.actionLabel.pack(side=tk.LEFT, padx=(0, 5)) + self.actionOption.pack(side=tk.LEFT, padx=(0, 5)) + self.profileLabel.pack(side=tk.LEFT, padx=(0, 5)) + self.profileOption.pack(side=tk.LEFT, padx=(0, 5)) + self.trimCheck.pack(side=tk.LEFT, padx=(0, 5)) + self.jobsLabel.pack(side=tk.LEFT, padx=(0, 5)) + self.jobsOption.pack(side=tk.LEFT, padx=(0, 5)) + + self.logOutput.grid(row=2, padx=5, pady=5, sticky=tk.N+tk.E+tk.S+tk.W) + + self.rootWindow.mainloop() + + def run(self) -> None: + self.plDirButton["state"] = "disabled" + self.plDirEntry["state"] = "disabled" + self.targetDirButton["state"] = "disabled" + self.targetDirEntry["state"] = "disabled" + self.converterPathButton["state"] = "disabled" + self.converterPathEntry["state"] = "disabled" + self.runButton["state"] = "disabled" + self.actionOption["state"] = "disabled" + self.profileOption["state"] = "disabled" + self.trimCheck["state"] = "disabled" + self.jobsOption["state"] = "disabled" + self.cancelButton["state"] = "normal" + + # Clear log. + self.logOutput.configure(state="normal") + self.logOutput.delete("1.0", tk.END) + self.logOutput.configure(state="disabled") + + thread: threading.Thread = threading.Thread(target=self.process) + thread.start() + + def cancel(self) -> None: + self.cancelButton["state"] = "disabled" + self.converter.stop() + + def process(self) -> None: + if self.action.get() == "check": + self.converter.check(self.plDirEntry.get(), self.targetDirEntry.get()) + else: + self.converter.convert( + ConvertParams(self.plDirEntry.get(), self.targetDirEntry.get(), + self.profile.get(), self.converterPathEntry.get(), + bool(self.trim.get()), int(self.jobs.get()))) + + # Processing is done. + self.plDirButton["state"] = "normal" + self.plDirEntry["state"] = "normal" + self.targetDirButton["state"] = "normal" + self.targetDirEntry["state"] = "normal" + self.converterPathButton["state"] = "normal" + self.converterPathEntry["state"] = "normal" + self.runButton["state"] = "normal" + self.actionOption["state"] = "normal" + self.profileOption["state"] = "normal" + self.trimCheck["state"] = "normal" + self.jobsOption["state"] = "normal" + self.cancelButton["state"] = "disabled" + + def log(self, message: str) -> None: + self.logLock.acquire() + self.logOutput.configure(state="normal") + + if len(message) > 0 and message[-1] == "\n": + self.logOutput.insert(tk.END, message) + else: + self.logOutput.insert(tk.END, message + "\n") + + #self.logOutput.see(tk.END) Causes segfault + self.logOutput.configure(state="disabled") + self.logLock.release() + + def setDirEntryFromPicker(self, entry) -> None: + choice: str = tkfd.askdirectory() + + if len(choice) != 0: + entry.delete(0, tk.END) + entry.insert(tk.INSERT, choice) + + def setFileEntryFromPicker(self, entry) -> None: + choice: str = tkfd.askopenfilename() + + if len(choice) != 0: + entry.delete(0, tk.END) + entry.insert(tk.INSERT, choice) +# /GUI + +def cli_print_message(message: str) -> None: + if len(message) > 0 and message[-1] == "\n": + print(message, end="") + else: + print(message) + +def cli_main() -> None: + parser: argparse.ArgumentParser = argparse.ArgumentParser( + description="Music library converter", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument("playlist-dir", help="Playlist directory", + type=str) + parser.add_argument("target-dir", help="Target directory", + type=str) + parser.add_argument("--profile", "-p", help="Conversion profile", + type=str, default="mp3") + parser.add_argument("--converter", "-c", help="Converter path", + type=str, default=CONVERTER_NAME) + parser.add_argument("--trim", "-t", help="Trim track numbers in file names", + action="store_true") + parser.add_argument("--jobs", "-j", help="Number of concurrent jobs", + type=int, default=1) + parser.add_argument("--check", help="Check mode", action="store_true") + + args: dict = vars(parser.parse_args()) + converter: Converter = Converter(cli_print_message) + + def cli_stop_signal(signal, frame): + converter.stop() + + signal.signal(signal.SIGINT, cli_stop_signal) + + if args["check"] == True: + converter.check(args["playlist-dir"], args["target-dir"]) + else: + converter.convert(ConvertParams(args["playlist-dir"], args["target-dir"], + args["profile"], args["converter"], + args["trim"], args["jobs"])) + +def gui_main() -> None: + gui: GUI = GUI() + gui.init() + +if __name__ == "__main__": + if len(sys.argv) <= 1: + gui_main() + else: + cli_main() -- 2.47.3