]> git.poda.fr Git - converter.git/commitdiff
Initial commit on new server
authorJulien <julien@poda.fr>
Fri, 9 Aug 2024 23:35:03 +0000 (01:35 +0200)
committerJulien <julien@poda.fr>
Fri, 9 Aug 2024 23:35:03 +0000 (01:35 +0200)
README.md [new file with mode: 0644]
converter.py [new file with mode: 0755]

diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..c08164b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,54 @@
+# Music Library Converter
+
+Convert and copy music files and playlists to a single directory.
+
+This directory is to be used as DJ software or phone music library.
+
+**Python and ffmpeg are required to run this application.**
+
+**Input**: Playlist directory containing playlist files.
+
+**Output**: Target directory containing converted files with desired extension and associated playlist files.
+
+### Examples
+
+Using GUI:
+
+`convert.py`
+
+Using CLI:
+
+`convert.py C:\PlaylistDirectory C:\TargetConvertedDirectory`
+
+`convert.py --profile aiff --jobs 4 --trim --converter C:\ffmpeg\bin\ffmpeg.exe C:\PlaylistDirectory C:\TargetDirectory`
+
+`convert.py -p flac -j 8 -t -c C:\ffmpeg\bin\ffmpeg.exe C:\PlaylistDirectory C:\TargetDirectory`
+
+### Supported library playlist extensions
+
+- **m3u**
+- **xspf**
+
+### Supported extension profiles
+
+- **mp3** (default)
+- **aiff**
+- **flac**
+
+### Supported target playlist extensions
+
+- **m3u**
+
+### Options
+
+- **converter**: Path to music file converter. For example `C:\ffmpeg\bin\ffmpeg.exe`.
+- **trim**: Trim track numbers from file names.
+- **jobs**: Number of parallel tasks to run (Multithreading).
+
+Music files are only replaced if their library version is more recent than their target directory version.
+
+Target directory playlist files are only replaced if their content changed.
+
+All files are converted with an external transcoder. As of now the only supported one is **ffmpeg**: https://ffmpeg.org/
+
+No data is ever written outside the target directory. Transcodes are only performed on lossless files, other files are simply copied.
diff --git a/converter.py b/converter.py
new file mode 100755 (executable)
index 0000000..2b57f34
--- /dev/null
@@ -0,0 +1,935 @@
+#!/usr/bin/env python
+
+'''
+BSD 3 License
+Copyright 2024 Julien Selamme
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the distribution.
+
+3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived
+from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+'''
+
+import argparse
+import filecmp # diff
+import os
+import queue
+import shutil # copy2
+import signal
+import subprocess
+import sys
+import threading
+import typing
+import urllib.parse # Decode XSPF URL formatted lines
+import xml.etree.ElementTree # Read XSPF files
+
+# GUI
+import tkinter as tk
+import tkinter.filedialog as tkfd
+import tkinter.scrolledtext as tkst
+
+# As of now only ffmpeg converter is supported.
+CONVERTER_NAME: str = "ffmpeg"
+
+class LibraryTrack:
+    def __init__(self, trackDir: str, trackFile: str) -> None:
+        self.trackDir: str = trackDir # Absolute path.
+        self.trackFile: str = trackFile # With extension.
+
+class LibraryPlaylist:
+    def __init__(self, name: str) -> None:
+        self.name: str = name
+        self.tracks: list[LibraryTrack] = []
+
+    def add(self, trackDir: str, trackFile: str) -> None:
+        self.tracks.append(LibraryTrack(trackDir, trackFile))
+
+class Task:
+    def __init__(self, taskType: str, converterArgs: list[str],
+                 inputFilePath: str, outputFilePath: str) -> None:
+        self.taskType: str = taskType # Convert or copy.
+        self.converterArgs: list[str] = converterArgs
+        self.inputFilePath: str = inputFilePath
+        self.outputFilePath: str = outputFilePath
+
+class ConvertParams:
+    def __init__(self, playlistDir: str, targetDir: str,
+                 convertProfile: str, converterPath: str,
+                 trimNTrack: bool, jobs: int) -> None:
+        self.playlistDir: str = playlistDir
+        self.targetDir: str = targetDir
+        self.convertProfile: str = convertProfile
+        self.converterPath: str = converterPath
+        self.trimNTrack: bool = trimNTrack
+        self.jobs: int = jobs
+
+class ConvertProfile:
+    def __init__(self) -> None:
+        self.name: str = "none"
+        self.targetExtension: str = ""
+        self.convertExtensions: list[str] = list()
+        self.copyExtensions: list[str] = list()
+        self.converterArgs: list[str] = list()
+
+    def init(self, profileName: str) -> bool:
+        # -n to not overwrite any existing file.
+        # -ar 44100 for 44.1 kHz audio output.
+        # -fps_mode vfr to drop frames which cannot be
+        #               properly contained in the output file.
+        commonFlags: list[str] = ["-hide_banner", "-loglevel", "error",
+                                  "-n", "-ar", "44100", "-fps_mode", "vfr"]
+
+        # -sample_fmt s16 for 16 bits audio output.
+        # -write_id3v2 1 to write metadata in aiff files.
+        if profileName == "aiff":
+            self.name = "aiff"
+            self.targetExtension = ".aiff"
+            self.convertExtensions = [".aiff", ".flac", ".wav"]
+            self.copyExtensions = [".mp3"]
+            self.converterArgs = (commonFlags
+                                  + ["-f", "aiff", "-sample_fmt", "s16",
+                                     "-write_id3v2", "1"])
+            return True
+        elif profileName == "flac":
+            self.name = "flac"
+            self.targetExtension = ".flac"
+            self.convertExtensions = [".aiff", ".flac", ".wav"]
+            self.copyExtensions = [".mp3"]
+            self.converterArgs = (commonFlags
+                                  + ["-f", "flac", "-sample_fmt", "s16"])
+            return True
+        # -q:a 0 to set mp3 variable bitrate.
+        elif profileName == "mp3":
+            self.name = "mp3"
+            self.targetExtension = ".mp3"
+            self.convertExtensions = [".aiff", ".flac", ".wav"]
+            self.copyExtensions = [".mp3"]
+            self.converterArgs = (commonFlags
+                                  + ["-f", "mp3",
+                                     "-c:a", "libmp3lame", "-q:a", "0"])
+            return True
+        else:
+            return False
+
+    def isTargetExtension(self, extension: str) -> bool:
+        return extension == self.targetExtension
+
+    def isConvertExtension(self, extension: str) -> bool:
+        return extension in self.convertExtensions
+
+    def isCopyExtension(self, extension: str) -> bool:
+        return extension in self.copyExtensions
+
+class Stats:
+    def __init__(self) -> None:
+        self.errors: int = 0
+        self.converted: int = 0
+        self.copied: int = 0
+        self.removed: int = 0
+        self.updated: int = 0
+        self.duplicates: int = 0
+        self.skipped: int = 0
+
+    def add(self, stats) -> None:
+        self.errors += stats.errors
+        self.converted += stats.converted
+        self.copied += stats.copied
+        self.removed += stats.removed
+        self.updated += stats.updated
+        self.duplicates += stats.duplicates
+        self.skipped += stats.skipped
+
+# Main converter class. Thread safe.
+class Converter:
+    def __init__(self, outstreamFunction: typing.Callable[[str], None]) -> None:
+        self._running: bool = False
+        self._stopSignal: bool = False
+        self._runLock: threading.Lock = threading.Lock()
+        self._outstreamFunction: typing.Callable[[str], None] = outstreamFunction
+        self._logLock: threading.Lock = threading.Lock()
+        self._statsLock: threading.Lock = threading.Lock()
+
+        self._TARGET_PLAYLIST_EXT: str = ".m3u"
+        # Used for temporary files made from copy and conversion.
+        self._TMP_EXT: str = ".tmp"
+        # As of now only ffmpeg converter is supported.
+        self._CONVERTER_CHECK: str = "-version"
+        self._CONVERTER_IN: str = "-i"
+
+    # Check for unused albums.
+    def check(self, playlistDir: str, libraryDir: str) -> None:
+        if not self._start():
+            return
+
+        self._check(playlistDir, libraryDir)
+        self._end()
+
+    def _check(self, playlistDir: str, libraryDir: str) -> None:
+        if not os.path.isdir(playlistDir):
+            self._log("Error: Invalid playlist directory '"
+                      + playlistDir + "'.")
+            return
+
+        if not os.path.isdir(libraryDir):
+            self._log("Error: Invalid library directory '"
+                      + libraryDir + "'.")
+            return
+
+        # List of albums/directories from library directory.
+        # Absolute paths are used so each album is considered unique.
+        albums: set[str] = set()
+        # Key is album/directory (absolute path).
+        # Value is how many times a track from this album is used.
+        presenceMap: dict[str, int] = {}
+
+        # Initialize album list.
+        for r, d, f in os.walk(libraryDir):
+            if self._getStopSignal() == True:
+                return
+
+            if len(d) == 0: # No sub directory means it's an album directory.
+                albums.add(os.path.abspath(r))
+
+        for playlistFile in os.listdir(playlistDir):
+            if self._getStopSignal() == True:
+                return
+
+            libPlaylist = self._createLibraryPlaylistFromFile(os.path.join(playlistDir,
+                                                                           playlistFile))
+            for track in libPlaylist.tracks:
+                trackFilePath = os.path.join(track.trackDir,
+                                             track.trackFile)
+
+                if not os.path.isfile(trackFilePath):
+                    self._log("Warning: '" + trackFilePath
+                              + "' in playlist '" + libPlaylist.name + "' not found.")
+
+                # If an album contains a track from any playlist remove it.
+                if track.trackDir in albums:
+                    albums.remove(track.trackDir)
+
+        # Print remaining unused albums.
+        if len(albums) > 0:
+            self._log("Unused albums:")
+
+            for album in sorted(albums):
+                self._log(album)
+
+    # Convert playlist content to target directory.
+    def convert(self, convertParams: ConvertParams) -> None:
+        if not self._start():
+            return
+
+        self._convert(convertParams)
+        self._end()
+
+    def _convert(self, convertParams: ConvertParams) -> None:
+        if not os.path.isdir(convertParams.playlistDir):
+            self._log("Error: Invalid playlist directory '"
+                      + convertParams.playlistDir + "'.")
+            return
+
+        if not os.path.isdir(convertParams.targetDir):
+            self._log("Error: Invalid target directory '"
+                      + convertParams.targetDir + "'.")
+            return
+
+        convertProfile: ConvertProfile = ConvertProfile()
+
+        if not convertProfile.init(convertParams.convertProfile):
+            self._log("Error: Invalid convert profile '"
+                      + convertParams.convertProfile + "'.")
+            return
+
+        if not self.checkConverter(convertParams.converterPath,
+                                   self._CONVERTER_CHECK):
+            self._log("Error: Could not start converter '"
+                      + convertParams.converterPath + "'.")
+            return
+
+        if convertParams.jobs < 1:
+            self._log("Error: Number of concurrent jobs must be at least 1.")
+            return
+
+        # Playlists generated from playlist files.
+        libPlaylists: dict[str, LibraryPlaylist] = self._createLibraryPlaylists(convertParams.playlistDir)
+
+        if len(libPlaylists) == 0:
+            self._log("No playlist found.")
+            return
+
+        # Existing files in target directory to be removed if not found in playlists.
+        unmatchedFiles: set[str] = set()
+        # Processed files. To handle duplicates (same file from multiple playlists).
+        processedFiles: set[str] = set()
+        # Existing playlists in target directory to be removed
+        # if not found in original playlist directory.
+        unmatchedPlaylistFiles: set[str] = set()
+        # Total playlist track count.
+        count: int = 0
+        stats: Stats = Stats()
+        # Task queue.
+        q: queue.SimpleQueue = queue.SimpleQueue()
+
+        # Build existing file sets from target directory.
+        for tFile in os.listdir(convertParams.targetDir):
+            if not os.path.isfile(os.path.join(convertParams.targetDir,
+                                               tFile)):
+                continue
+
+            tExt: str = self.getFileExtension(tFile).lower()
+
+            if (convertProfile.isTargetExtension(tExt)
+                or convertProfile.isCopyExtension(tExt)):
+                unmatchedFiles.add(tFile)
+            elif tExt == self._TARGET_PLAYLIST_EXT:
+                unmatchedPlaylistFiles.add(tFile)
+            # else ? File tFile not handled.
+
+        # Build task queue and update playlist files.
+        for p in libPlaylists:
+            if self._getStopSignal() == True:
+                self._log("Stopped.")
+                return
+
+            playlist: LibraryPlaylist = libPlaylists[p]
+            targetPlaylistFile: str = (playlist.name
+                                       + self._TARGET_PLAYLIST_EXT)
+            targetFileList: list[str] = []
+
+            if targetPlaylistFile in unmatchedPlaylistFiles:
+                unmatchedPlaylistFiles.remove(targetPlaylistFile)
+
+            for track in playlist.tracks:
+                trackFilePath: str = os.path.join(track.trackDir,
+                                                  track.trackFile)
+
+                if not os.path.isfile(trackFilePath):
+                    self._log("Error: '" + trackFilePath + "' in playlist '"
+                              + playlist.name + "' is not a file.")
+                    stats.errors += 1
+                    continue
+
+                # Default task type: copy.
+                taskType: str = "copy"
+                targetName: str = self.trimFileExtension(track.trackFile)
+                targetExt: str = self.getFileExtension(track.trackFile).lower()
+
+                # Convert.
+                if convertProfile.isConvertExtension(targetExt):
+                    targetExt = convertProfile.targetExtension
+                    taskType = "convert"
+                # Do nothing if extension not found in profile.
+                elif not convertProfile.isCopyExtension(targetExt):
+                    self._log("Error: Unsupported track file '"
+                              + trackFilePath + "' in playlist '"
+                              + playlist.name + "' for profile '"
+                              + convertProfile.name + "'.")
+                    stats.errors += 1
+                    continue
+
+                targetFileName: str = targetName
+
+                # Replace m3u comment character.
+                targetFileName = targetFileName.replace('#', '_')
+
+                if convertParams.trimNTrack:
+                    targetFileName = self.trimTrackFileName(targetFileName)
+
+                targetFile: str = targetFileName + targetExt
+                targetFilePath: str = os.path.join(convertParams.targetDir,
+                                                   targetFile)
+
+                targetFileList.append(targetFile)
+
+                # Target already exists.
+                if targetFile in unmatchedFiles:
+                    unmatchedFiles.remove(targetFile)
+
+                    if not os.path.isfile(targetFilePath):
+                        self._log("Error: Existing target element '"
+                                  + targetFilePath + "' is not a valid file.")
+                        stats.errors += 1
+                    elif self.isFileChanged(trackFilePath, targetFilePath):
+                        processedFiles.add(targetFile)
+                        q.put(Task(taskType, convertProfile.converterArgs,
+                                   trackFilePath, targetFilePath))
+                    else:
+                        processedFiles.add(targetFile)
+                        stats.skipped += 1
+                # Target already processed.
+                elif targetFile in processedFiles:
+                    stats.duplicates +=1
+                # Target does not exist.
+                else:
+                    processedFiles.add(targetFile)
+                    q.put(Task(taskType, convertProfile.converterArgs,
+                               trackFilePath, targetFilePath))
+            # /for track in playlist.tracks
+
+            count += len(playlist.tracks)
+
+            # Replace existing playlist file.
+            self._updatePlaylistFile(targetFileList,
+                 os.path.join(convertParams.targetDir, targetPlaylistFile))
+        # /for playlist in libPlaylists
+
+        if self._getStopSignal() == True:
+            self._log("Stopped.")
+            return
+
+        # Delete non matched playlists.
+        for f in unmatchedPlaylistFiles:
+            self._log("Removing non matched playlist '" + f + "'.")
+            os.remove(os.path.join(convertParams.targetDir, f))
+
+        # Delete non matched tracks.
+        for f in unmatchedFiles:
+            self._log("Removing non matched file '" + f + "'.")
+            os.remove(os.path.join(convertParams.targetDir, f))
+            stats.removed += 1
+
+        threads: list[threading.Thread] = []
+
+        for i in range(0, convertParams.jobs - 1):
+            # Create worker thread.
+            threads.append(threading.Thread(target=self._processQueue,
+                                            args=(q, convertParams.converterPath, stats)))
+
+            # A thread stops when it gets None from queue.
+            q.put(None)
+
+            threads[i].start()
+
+        # Join workers.
+        q.put(None)
+        self._processQueue(q, convertParams.converterPath, stats)
+
+        # Wait for workers.
+        for i in range(0, len(threads)):
+            threads[i].join()
+
+        self._log("From " + str(count) + " elements in "
+             + str(len(libPlaylists)) + " playlists:"
+             + " Errors " + str(stats.errors)
+             + " Converted " + str(stats.converted)
+             + " Copied " + str(stats.copied)
+             + " Updated " + str(stats.updated)
+             + " Removed " + str(stats.removed)
+             + " Duplicates " + str(stats.duplicates)
+             + " Skipped " + str(stats.skipped) + ".")
+
+    # Main function started by threads. Process queue elements
+    # until 'None' is found or stop signal is received.
+    def _processQueue(self, q: queue.SimpleQueue,
+                      converterPath: str, stats: Stats) -> None:
+        localStats: Stats = Stats()
+
+        # Process elements until getting 'None'.
+        while True:
+            queueElement: Task = q.get()
+
+            if queueElement == None or self._getStopSignal() == True:
+                break
+
+            self._processTask(queueElement, converterPath, localStats)
+            #q.task_done()
+
+        # Update global stats.
+        self._statsLock.acquire()
+        stats.add(localStats)
+        self._statsLock.release()
+
+    # Copy or convert a music file.
+    def _processTask(self, task: Task, converterPath: str,
+                     stats: Stats) -> None:
+        if not os.path.isfile(task.inputFilePath):
+            self._log("Error: Playlist element '" + task.inputFilePath
+                      + "' is not a file.")
+            stats.errors += 1
+            return
+
+        update: bool = os.path.isfile(task.outputFilePath)
+        outputFilePathWip: str = task.outputFilePath + self._TMP_EXT
+
+        for f in [task.outputFilePath, outputFilePathWip]:
+            if os.path.isfile(f):
+                self._log("Removing outdated file '" + f + "'.")
+                os.remove(f)
+            elif os.path.exists(f):
+                self._log("Error: Existing non file element'" + f + "'.")
+                stats.errors += 1
+                return
+
+        if task.taskType == "copy":
+            self._log("Copying '" + task.inputFilePath
+                      + "' into '" + task.outputFilePath + "'.")
+            self.copyFile(task.inputFilePath, outputFilePathWip)
+            os.rename(outputFilePathWip, task.outputFilePath)
+            stats.copied += 1
+
+            if update:
+                stats.updated +=1
+        elif task.taskType == "convert":
+            command: list[str] = ([converterPath] + [self._CONVERTER_IN]
+                                  + [task.inputFilePath] + task.converterArgs
+                                  + [outputFilePathWip])
+            self._log("Converting '" + task.inputFilePath + "' to '"
+                      + task.outputFilePath + "'.")
+            result: subprocess.CompletedProcess = subprocess.run(command, capture_output=True, text=True)
+
+            if result.returncode == 0:
+                os.rename(outputFilePathWip, task.outputFilePath)
+                stats.converted += 1
+
+                if update:
+                    stats.updated +=1
+            else:
+                self._log("Error from converter: " + result.stderr)
+                stats.errors += 1
+        else: # Unknown task
+            self._log("Error: Skipping non valid task type '" + task.taskType
+                      + "' file '" + task.inputFilePath + "'.")
+            stats.errors += 1
+
+    # Create library playlists from a base directory.
+    def _createLibraryPlaylists(self, directory: str) -> dict[str, LibraryPlaylist]:
+        libPlaylists: dict[str, LibraryPlaylist] = {}
+
+        if not os.path.isdir(directory):
+            self._log("Warning: '" + directory + "' is not a valid directory.")
+        else:
+            for element in os.listdir(directory):
+                path: str = os.path.join(directory, element)
+
+                if not os.path.isfile(path):
+                    continue
+
+                playlistName: str = self.trimFileExtension(element)
+
+                if playlistName in libPlaylists:
+                    self._log("Warning: Duplicate playlist name '"
+                              + playlistName + "'.")
+                else:
+                    libPlaylist: LibraryPlaylist = self._createLibraryPlaylistFromFile(path)
+
+                    if len(libPlaylist.tracks) == 0:
+                        self._log("Warning: Empty library playlist '"
+                                  + libPlaylist.name + "'.")
+                    else:
+                        libPlaylists[playlistName] = libPlaylist
+
+        return libPlaylists
+
+    # Create a library playlist from a playlist file.
+    def _createLibraryPlaylistFromFile(self, filePath: str) -> LibraryPlaylist:
+        playlist: LibraryPlaylist = LibraryPlaylist(self.trimFileExtension(os.path.basename(filePath)))
+
+        if (self.getFileExtension(filePath).lower() == ".m3u"
+            or self.getFileExtension(filePath).lower() == ".m3u8"):
+            self._loadTracksFromM3u(filePath, playlist)
+        elif self.getFileExtension(filePath).lower() == ".xspf":
+            self._loadTracksFromXspf(filePath, playlist)
+        else:
+            self._log("Warning: Unsupported playlist extension for file '"
+                      + filePath + "'.")
+
+        return playlist
+
+    # Read m3u playlist file to get all its tracks.
+    def _loadTracksFromM3u(self, filePath: str, playlist: LibraryPlaylist) -> None:
+        m3uFile: typing.IO[str] = self.openFile(filePath, "rt")
+
+        # Each line is an string path.
+        for line in m3uFile:
+            string: str = line.rstrip()
+
+            # Starting with '#' means it's a comment.
+            if len(string) == 0 or string[0] == "#":
+                continue
+
+            playlist.add(os.path.dirname(string), os.path.basename(string))
+
+    # Read xspf playlist file to get all its tracks.
+    def _loadTracksFromXspf(self, filePath: str, playlist: LibraryPlaylist) -> None:
+        xspf: xml.etree.ElementTree.Element = xml.etree.ElementTree.parse(filePath).getroot()
+        xmlns: str = "http://xspf.org/ns/0/"
+
+        # Each element is a string path.
+        for track in xspf[0]: # <track> in <tracks>
+            element: typing.Optional[xml.etree.ElementTree.Element] = track.find("{" + xmlns + "}location")
+
+            if element is None:
+                continue
+
+            urlDecodedString: str = urllib.parse.unquote(str(element.text))
+            playlist.add(os.path.dirname(urlDecodedString),
+                         os.path.basename(urlDecodedString))
+
+    # Create or update playlist file from a directory.
+    def _updatePlaylistFile(self, playlist: list[str], playlistPath: str) -> None:
+        oriPlFilePath: str = playlistPath
+        repPlFilePath: str = oriPlFilePath + self._TMP_EXT
+
+        self._buildM3uPlaylistFile(playlist, repPlFilePath)
+
+        # Replace old playlist file if content is different.
+        if os.path.isfile(oriPlFilePath):
+            # Do nothing if new playlist file is identical to the previous one.
+            if self.isFileIdentical(oriPlFilePath, repPlFilePath):
+                os.remove(repPlFilePath)
+                return
+
+            self._log("Updating outdated playlist file '" + oriPlFilePath + "'.")
+            os.remove(oriPlFilePath)
+        else:
+            self._log("Adding playlist file '" + oriPlFilePath + "'.")
+
+        os.rename(repPlFilePath, oriPlFilePath)
+
+    # Build a m3u file from a playlist.
+    def _buildM3uPlaylistFile(self, playlist: list[str], filePath: str) -> None:
+        plFile: typing.IO[str] = self.openFile(filePath, "wt")
+
+        # Build new playlist file.
+        for track in playlist:
+            plFile.write(track + "\n")
+
+        plFile.close()
+
+    # Prevent the same converter from running twice.
+    def _start(self) -> bool:
+        result: bool = False
+        self._runLock.acquire()
+
+        if self._running:
+            self._log("Error: This conveter is already running.")
+        else:
+            self._running = True
+            self._stopSignal = False
+            result = True
+
+        self._runLock.release()
+        return result
+
+    def _end(self) -> None:
+        self._runLock.acquire()
+        self._running = False
+        self._runLock.release()
+
+    # Called from external thread.
+    def stop(self) -> None:
+        self._log("Received stop signal. Stopping ongoing operations...")
+
+        self._runLock.acquire()
+        self._stopSignal = True
+        self._runLock.release()
+
+    def _getStopSignal(self) -> bool:
+        self._runLock.acquire()
+        result = self._stopSignal
+        self._runLock.release()
+        return result
+
+    def _log(self, message: str) -> None:
+        self._logLock.acquire()
+        self._outstreamFunction(message)
+        self._logLock.release()
+
+    # Trim file name to remove leader track number.
+    def trimTrackFileName(self, trackFileName: str) -> str:
+        n: int = 0
+
+        # Go past leading numbers.
+        for e in trackFileName:
+            if e.isdigit():
+                n = n + 1
+            else:
+                break
+
+        # NN - NAME, NN . NAME
+        if (len(trackFileName) > n + 3
+            and trackFileName[n:n+3] in [" - ",  " . "]):
+            return trackFileName[n+3:]
+        # NN- NAME, NN -NAME, NN. NAME, NN .NAME
+        elif (len(trackFileName) > n + 2
+              and trackFileName[n:n+2] in ["- ", " -", ". ", " ."]):
+            return trackFileName[n+2:]
+        # NN-NAME, NN.NAME, NN NAME
+        elif (len(trackFileName) > n + 1
+              and trackFileName[n:n+1] in ["-", ".", " "]):
+            return trackFileName[n+1:]
+        else:
+            return trackFileName
+
+    def trimFileExtension(self, filePath: str) -> str:
+        return os.path.splitext(filePath)[0]
+
+    def getFileExtension(self, filePath: str) -> str:
+        return os.path.splitext(filePath)[1]
+
+    def openFile(self, path: str, args: str) -> typing.IO[str]:
+        return open(path, args, encoding="utf8")
+
+    def makeDir(self, dirPath: str) -> None:
+        os.makedirs(dirPath, exist_ok=True)
+
+    def copyFile(self, oriFilePath: str, newFilePath: str) -> None:
+        shutil.copy2(oriFilePath, newFilePath)
+
+    # Compare last modification date.
+    def isFileChanged(self, sourceFilePath: str,
+                      targetFilePath: str) -> bool:
+        return (os.path.getmtime(sourceFilePath)
+                > os.path.getmtime(targetFilePath))
+
+    def isFileIdentical(self, firstFilePath: str,
+                        secondFilePath: str) -> bool:
+        return filecmp.cmp(firstFilePath, secondFilePath,
+                           shallow=False)
+
+    def checkConverter(self, converterPath: str,
+                       converterArg: str) -> bool:
+        try:
+            return subprocess.run([converterPath, converterArg],
+                                  stdout=subprocess.DEVNULL,
+                                  stderr=subprocess.DEVNULL).returncode == 0
+        except:
+            return False;
+# /Converter
+
+class GUI:
+    def __init__(self) -> None:
+        self.converter: Converter = Converter(self.log)
+
+        self.rootWindow: tk.Tk = tk.Tk()
+        self.rootWindow.title("Music Library Converter")
+
+        self.pickGrid: tk.Frame = tk.Frame(self.rootWindow)
+
+        self.plDirLabel: tk.Label = tk.Label(self.pickGrid, text="Playlists directory")
+        self.plDirEntry: tk.Entry = tk.Entry(self.pickGrid)
+        self.plDirButton: tk.Button = tk.Button(self.pickGrid,
+            command=lambda: self.setDirEntryFromPicker(self.plDirEntry), text="Browse")
+
+        self.targetDirLabel: tk.Label = tk.Label(self.pickGrid, text="Target directory")
+        self.targetDirEntry: tk.Entry = tk.Entry(self.pickGrid)
+        self.targetDirButton: tk.Button = tk.Button(self.pickGrid,
+            command=lambda: self.setDirEntryFromPicker(self.targetDirEntry), text="Browse")
+
+        self.converterPathLabel: tk.Label = tk.Label(self.pickGrid, text="Converter path")
+        self.converterPathEntry: tk.Entry = tk.Entry(self.pickGrid)
+        self.converterPathEntry.insert(tk.INSERT, CONVERTER_NAME)
+        self.converterPathButton: tk.Button = tk.Button(self.pickGrid,
+            command=lambda: self.setFileEntryFromPicker(self.converterPathEntry), text="Browse")
+
+        self.runFrame: tk.Frame = tk.Frame(self.rootWindow)
+
+        self.runButton: tk.Button = tk.Button(self.runFrame,
+            command=self.run, text="Run Configuration")
+
+        self.cancelButton: tk.Button = tk.Button(self.runFrame,
+            command=self.cancel, text="Cancel")
+        self.cancelButton["state"] = "disabled"
+
+        self.actionLabel: tk.Label = tk.Label(self.runFrame,
+            text="Action")
+        self.action: tk.StringVar = tk.StringVar(self.runFrame)
+        self.action.set("convert")
+        self.actionOption: tk.OptionMenu = tk.OptionMenu(self.runFrame,
+            self.action, "convert", "check")
+        self.actionOption.config(width=8)
+
+        self.profileLabel: tk.Label = tk.Label(self.runFrame,
+            text="Profile")
+        self.profile: tk.StringVar = tk.StringVar(self.runFrame)
+        self.profile.set("mp3")
+        self.profileOption: tk.OptionMenu = tk.OptionMenu(self.runFrame,
+            self.profile, "mp3", "aiff", "flac")
+        self.profileOption.config(width=5)
+
+        self.trim: tk.IntVar = tk.IntVar(value=1)
+        self.trimCheck: tk.Checkbutton = tk.Checkbutton(self.runFrame,
+            text="Trim track N", variable=self.trim)
+
+        self.jobsLabel: tk.Label = tk.Label(self.runFrame,
+            text="Jobs")
+        self.jobs: tk.StringVar = tk.StringVar(value="1")
+        self.jobsOption: tk.OptionMenu = tk.OptionMenu(self.runFrame,
+            self.jobs, "1", "2", "3", "4", "5", "6", "7", "8", "9", "10")
+
+        self.logOutput: tkst.ScrolledText = tkst.ScrolledText(self.rootWindow)
+        self.logOutput.configure(state="disabled")
+        self.logLock: threading.Lock = threading.Lock()
+
+    def init(self) -> None:
+        self.rootWindow.grid_columnconfigure(0, weight=1)
+        self.rootWindow.grid_rowconfigure(2, weight=1)
+
+        self.pickGrid.grid(row=0, pady=5, sticky=tk.N+tk.E+tk.S+tk.W)
+        self.pickGrid.grid_columnconfigure(2, weight=1)
+
+        self.plDirLabel.grid(row=0, column=0, padx=5, sticky=tk.W)
+        self.plDirButton.grid(row=0, column=1, padx=5, sticky=tk.W)
+        self.plDirEntry.grid(row=0, column=2, padx=5, sticky=tk.E+tk.W)
+        self.targetDirLabel.grid(row=1, column=0, padx=5, sticky=tk.W)
+        self.targetDirButton.grid(row=1, column=1, padx=5, sticky=tk.W)
+        self.targetDirEntry.grid(row=1, column=2, padx=5, sticky=tk.E+tk.W)
+        self.converterPathLabel.grid(row=2, column=0, padx=5, sticky=tk.W)
+        self.converterPathButton.grid(row=2, column=1, padx=5, sticky=tk.W)
+        self.converterPathEntry.grid(row=2, column=2, padx=5, sticky=tk.E+tk.W)
+
+        self.runFrame.grid(row=1, sticky=tk.N+tk.S+tk.W)
+
+        self.runButton.pack(side=tk.LEFT, padx=(5, 5))
+        self.cancelButton.pack(side=tk.LEFT, padx=(0, 5))
+        self.actionLabel.pack(side=tk.LEFT, padx=(0, 5))
+        self.actionOption.pack(side=tk.LEFT, padx=(0, 5))
+        self.profileLabel.pack(side=tk.LEFT, padx=(0, 5))
+        self.profileOption.pack(side=tk.LEFT, padx=(0, 5))
+        self.trimCheck.pack(side=tk.LEFT, padx=(0, 5))
+        self.jobsLabel.pack(side=tk.LEFT, padx=(0, 5))
+        self.jobsOption.pack(side=tk.LEFT, padx=(0, 5))
+
+        self.logOutput.grid(row=2, padx=5, pady=5, sticky=tk.N+tk.E+tk.S+tk.W)
+
+        self.rootWindow.mainloop()
+
+    def run(self) -> None:
+        self.plDirButton["state"] = "disabled"
+        self.plDirEntry["state"] = "disabled"
+        self.targetDirButton["state"] = "disabled"
+        self.targetDirEntry["state"] = "disabled"
+        self.converterPathButton["state"] = "disabled"
+        self.converterPathEntry["state"] = "disabled"
+        self.runButton["state"] = "disabled"
+        self.actionOption["state"] = "disabled"
+        self.profileOption["state"] = "disabled"
+        self.trimCheck["state"] = "disabled"
+        self.jobsOption["state"] = "disabled"
+        self.cancelButton["state"] = "normal"
+
+        # Clear log.
+        self.logOutput.configure(state="normal")
+        self.logOutput.delete("1.0", tk.END)
+        self.logOutput.configure(state="disabled")
+
+        thread: threading.Thread = threading.Thread(target=self.process)
+        thread.start()
+
+    def cancel(self) -> None:
+        self.cancelButton["state"] = "disabled"
+        self.converter.stop()
+
+    def process(self) -> None:
+        if self.action.get() == "check":
+            self.converter.check(self.plDirEntry.get(), self.targetDirEntry.get())
+        else:
+            self.converter.convert(
+                ConvertParams(self.plDirEntry.get(), self.targetDirEntry.get(),
+                              self.profile.get(), self.converterPathEntry.get(),
+                              bool(self.trim.get()), int(self.jobs.get())))
+
+        # Processing is done.
+        self.plDirButton["state"] = "normal"
+        self.plDirEntry["state"] = "normal"
+        self.targetDirButton["state"] = "normal"
+        self.targetDirEntry["state"] = "normal"
+        self.converterPathButton["state"] = "normal"
+        self.converterPathEntry["state"] = "normal"
+        self.runButton["state"] = "normal"
+        self.actionOption["state"] = "normal"
+        self.profileOption["state"] = "normal"
+        self.trimCheck["state"] = "normal"
+        self.jobsOption["state"] = "normal"
+        self.cancelButton["state"] = "disabled"
+
+    def log(self, message: str) -> None:
+        self.logLock.acquire()
+        self.logOutput.configure(state="normal")
+
+        if len(message) > 0 and message[-1] == "\n":
+            self.logOutput.insert(tk.END, message)
+        else:
+            self.logOutput.insert(tk.END, message + "\n")
+
+        #self.logOutput.see(tk.END) Causes segfault
+        self.logOutput.configure(state="disabled")
+        self.logLock.release()
+
+    def setDirEntryFromPicker(self, entry) -> None:
+        choice: str = tkfd.askdirectory()
+
+        if len(choice) != 0:
+            entry.delete(0, tk.END)
+            entry.insert(tk.INSERT, choice)
+
+    def setFileEntryFromPicker(self, entry) -> None:
+        choice: str = tkfd.askopenfilename()
+
+        if len(choice) != 0:
+            entry.delete(0, tk.END)
+            entry.insert(tk.INSERT, choice)
+# /GUI
+
+def cli_print_message(message: str) -> None:
+    if len(message) > 0 and message[-1] == "\n":
+        print(message, end="")
+    else:
+        print(message)
+
+def cli_main() -> None:
+    parser: argparse.ArgumentParser = argparse.ArgumentParser(
+        description="Music library converter",
+        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+    parser.add_argument("playlist-dir", help="Playlist directory",
+                        type=str)
+    parser.add_argument("target-dir", help="Target directory",
+                        type=str)
+    parser.add_argument("--profile", "-p", help="Conversion profile",
+                        type=str, default="mp3")
+    parser.add_argument("--converter", "-c", help="Converter path",
+                        type=str, default=CONVERTER_NAME)
+    parser.add_argument("--trim", "-t", help="Trim track numbers in file names",
+                        action="store_true")
+    parser.add_argument("--jobs", "-j", help="Number of concurrent jobs",
+                        type=int, default=1)
+    parser.add_argument("--check", help="Check mode", action="store_true")
+
+    args: dict = vars(parser.parse_args())
+    converter: Converter = Converter(cli_print_message)
+
+    def cli_stop_signal(signal, frame):
+        converter.stop()
+
+    signal.signal(signal.SIGINT, cli_stop_signal)
+
+    if args["check"] == True:
+        converter.check(args["playlist-dir"], args["target-dir"])
+    else:
+        converter.convert(ConvertParams(args["playlist-dir"], args["target-dir"],
+                                        args["profile"], args["converter"],
+                                        args["trim"], args["jobs"]))
+
+def gui_main() -> None:
+    gui: GUI = GUI()
+    gui.init()
+
+if __name__ == "__main__":
+    if len(sys.argv) <= 1:
+        gui_main()
+    else:
+        cli_main()