# -*- coding: utf-8 -*- import csv import openpyxl import librosa from matplotlib import pyplot as plt from nnAudio import features import time import os.path from PIL import Image import imagehash from dtw import dtw from scipy.spatial.distance import euclidean from synctoolbox.dtw import mrmsdtw from fastdtw import fastdtw import torch import numpy as np import subprocess as sp from csv import writer import os from synctoolbox.feature.pitch import audio_to_pitch_features from synctoolbox.feature.chroma import pitch_to_chroma, quantize_chroma, quantized_chroma_to_CENS DEVNULL = open(os.devnull, 'w') # Global settings variables ------------------------------------------------------------------------------------- fs = 11025 output_filename = "DuplicateFinder_results" output_filename_duplicates_only = "DuplicateFinder_results_duplicates_only" # Global variables for functions -------------------------------------------------------------------------------- spec_layer = None # Functions for audio duplicates finding ------------------------------------------------------------------------ def find_duplicates(path, outputpath = None, accuracy = "normal", chroma_method= "cuda"): """ Explanation: Simplest function to run duplicate finding algorithm using four different accuracy settings :param path: Directory of folder in which to look for duplicates :param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder :param accuracy: Calculation accuracy, can be set to "low", "normal", "high" and "extreme"; "low" for finding duplicates which are exactly identical, "normal" for cases where there may be some sort of noise in the beggining/end of one duplicate or for case when one duplicate is encoded into very low bitrate, "high" is similar as normal, but has even lower tolerance for differences, "extreme" can be used for cases when user expects very long passages (like half of the whole recording) of noise in beggining/end of some of the audio duplicates :param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency """ if accuracy == "low": find_duplicates_img_hashing(path, outputpath, chroma_method, hashdiff_tresh = 0) elif accuracy == "normal": find_duplicates_combined(path, outputpath, chroma_method, hashdiff_tresh = 10, dtwarea = 1000000, verify_extremes = False) elif accuracy == "high": find_duplicates_combined(path, outputpath, chroma_method, hashdiff_tresh = 20, dtwarea = 1000000, verify_extremes = False) elif accuracy == "extreme": find_duplicates_dtw(path, outputpath, dtwarea = 10000000, verify_extremes = True) def find_duplicates_dtw(path, outputpath = None, chroma_method = "cuda", dtwtype ="mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4): """ Explanation: Function that iterates through user-defined audio files directory to find duplicates using DTW method, writes output to .csv and .xlsx files :param path: Directory of folder in which to look for duplicates :param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder :param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency :param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw) :param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius :param verify_extremes: Sets whether path evaluation is done for both orientations of the axis; set to true, if you expect very long passages of silence, applause etc. in beginning of one of the recordings :param testpointsnum: number of points tested between referential points :param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same :param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path """ filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])).tolist() # list containing paths to all audiofiles (every subdirectory) if (outputpath == None): # if outputpath is not defined, it gets created inside path directory outputpath = os.path.join(path, "DuplicateFinder") if (os.path.isdir(outputpath) == False): # create data output directory if doesnt exist yet os.mkdir(outputpath) csvfiledir = os.path.join(outputpath, output_filename + ".csv") excelfiledir = os.path.join(outputpath, output_filename + ".xlsx") csvfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".csv") excelfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".xlsx") # starts the time counter tcalcstart = time.time() # chromagrams calculation _calculate_chromagrams(path, outputpath, chroma_method) # DTW calculation (returns list of duplicates) duplicatepairslist = _return_duplicates_dtw(path, outputpath, chroma_method, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider) # writes to output files _create_output_files(duplicatepairslist, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only) # stops the timer and writes output to console tcalcend = time.time() calctime = round(tcalcend - tcalcstart, 2) numofduplicatepairs = len(duplicatepairslist) print("\nCalculation finished!") print("Total calculation time: " + str(calctime) + " s") print("Number of duplicate pairs found: " + str(numofduplicatepairs)) def find_duplicates_img_hashing(path, outputpath = None, chroma_method = "cuda", hashdiff_tresh = 10): """ Explanation: Function that iterates through user-defined audio files directory to find duplicates using image hashing method, writes output to .csv and .xlsx files :param path: Directory of folder in which to look for duplicates :param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder :param method: "cuda" for chroma features calculation using CUDA and nnAudio, "synctoolbox" for using same-named library :param hashdiff_tresh: Treshold of hash difference, for which two recordings are evaluated as same """ filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])).tolist() # list containing paths to all audiofiles (every subdirectory) if (outputpath == None): # if outputpath is not defined, it gets created inside path directory outputpath = os.path.join(path, "DuplicateFinder") if (os.path.isdir(outputpath) == False): # create data output directory if doesnt exist yet os.mkdir(outputpath) csvfiledir = os.path.join(outputpath, output_filename + ".csv") excelfiledir = os.path.join(outputpath, output_filename + ".xlsx") csvfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".csv") excelfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".xlsx") chroma_folder = os.path.join(outputpath, "chroma_files") chroma_imgs_folder = os.path.join(outputpath, "chroma_files_imgs") # starts the time counter tcalcstart = time.time() # chroma features calculation _calculate_chromagrams(path, outputpath, chroma_method) # exports chroma features as images _export_chromafiles_as_imgs(chroma_folder, chroma_imgs_folder) # duplicates calculation pairslist = _return_duplicates_img_hashing(path, outputpath, hashdiff_tresh) # stops the timer and writes output to console tcalcend = time.time() calctime = round(tcalcend - tcalcstart, 2) numofduplicatepairs = len(pairslist) print("\nCalculation finished!") print("Total calculation time: " + str(calctime) + " s") print("Number of duplicate pairs found: " + str(numofduplicatepairs)) # writing results to output file _create_output_files(pairslist, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only) def find_duplicates_combined(path, outputpath = None, chroma_method = "cuda", hashdiff_tresh = 10, dtwtype ="mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4): """ Explanation: Function that iterates through user-defined audio files directory to find duplicates using image hashing first to check which pairs might be similar, and then evaluating these found pairs using DTW method :param path: Directory of folder in which to look for duplicates :param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder :param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency :param hashdiff_tresh: Treshold of hash difference, for which two recordings are evaluated as same :param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw) :param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius :param verify_extremes: Sets whether path evaluation is done for both orientations of the axis; set to true, if you expect very long passages of silence, applause etc. in beginning of one of the recordings :param testpointsnum: number of points tested between referential points :param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same :param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path """ filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])).tolist() # list containing paths to all audiofiles (every subdirectory) if (outputpath == None): # if outputpath is not defined, it gets created inside path directory outputpath = os.path.join(path, "DuplicateFinder") if (os.path.isdir(outputpath) == False): # create data output directory if doesnt exist yet os.mkdir(outputpath) csvfiledir = os.path.join(outputpath, output_filename + ".csv") excelfiledir = os.path.join(outputpath, output_filename + ".xlsx") csvfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".csv") excelfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".xlsx") chroma_folder = os.path.join(outputpath, "chroma_files") chroma_imgs_folder = os.path.join(outputpath, "chroma_files_imgs") # starts the time counter tcalcstart = time.time() # chromagrams calculation _calculate_chromagrams(path, outputpath, chroma_method) # exports chroma features as images _export_chromafiles_as_imgs(chroma_folder, chroma_imgs_folder) # image hashing duplicates pre-calculation pairslistimghashing = _return_duplicates_img_hashing(path, outputpath, hashdiff_tresh) # DTW calculation of only pairs pre-calculated by image hashing pairslistfinal = _return_duplicates_dtw(path, outputpath, chroma_method, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider, pairslistimghashing) # writes to output files _create_output_files(pairslistfinal, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only) # stops the timer and writes output to console tcalcend = time.time() calctime = round(tcalcend - tcalcstart, 2) numofduplicatepairs = len(pairslistfinal) print("\nCalculation finished!") print("Total calculation time: " + str(calctime) + " s") print("Number of duplicate pairs found: " + str(numofduplicatepairs)) def is_chroma_duplicate(chroma1, chroma2, dtwtype = "mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4, showplot=False): """ Explanation: Checks if two chromagrams corresponding to recordings are duplicates or not :param chroma1: Chromagram of first recording :param chroma2: Chromagram of second recording :param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw) :param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius :param verify_extremes: Sets whether path evaluation is done for both orientations of the axis; set to true, if you expect very long passages of silence, applause etc. in beginning of one of the recordings :param testpointsnum: number of points tested between referential points :param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same :param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path :param showplot: if set to true, function will plot the results :return: returns true if two input chromagrams are the same, returns false otherwise """ # DTW --------------------------- if dtwtype == "mrmsdtw": path = mrmsdtw.sync_via_mrmsdtw(chroma1, chroma2, dtw_implementation="librosa", threshold_rec=dtwarea) pathx = np.array(path[0,:]) # rozdeleni cesty do dvou np arrays pathy = np.array(path[1,:]) elif dtwtype == "fastdtw": # flipnuti os chroma vektoru chroma1 = np.swapaxes(chroma1, 0, 1) chroma2 = np.swapaxes(chroma2, 0, 1) distance, path = fastdtw(x = chroma1, y = chroma2, dist = euclidean, radius = dtwarea) pathx, pathy = zip(*path[::-1]) # reverse osy aby slo vzestupne a rozdeleni do dvou samostatnych arrays pathx = np.array(pathx) # prevedeni na datovy typ array pathy = np.array(pathy) elif dtwtype == "dtw": chroma1 = np.swapaxes(chroma1, 0, 1) chroma2 = np.swapaxes(chroma2, 0, 1) path = dtw(chroma1, chroma2, dist = euclidean) pathx = path[3][0] # rozdeleni cesty do dvou np arrays pathy = path[3][1] else: print("Wrong dtwtype input argument!") quit() issame, plt = _verify_path_flatness(pathx, pathy, testpointsnum = testpointsnum, diffpointstolerance = diffpointstolerance, segmentdivider = segmentdivider) if (verify_extremes): if (issame == False): # if the system returns that files are not duplicates, it flips the axes and verifies in this order aswell (which can help if there is for example very long passage of noise in one of the audio files) plt.clf() issame, plt = _verify_path_flatness(pathy, pathx, testpointsnum = testpointsnum, diffpointstolerance = diffpointstolerance, segmentdivider = segmentdivider) if (showplot == True): plt.show() return issame def is_duplicate(audiofile1_name, audiofile2_name, chroma_method = "cuda", dtwtype = "mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4, showplot=False): """ Explanation: Checks if two audio files are duplicates or not :param audiofile1_name: Path to first audio file :param audiofile2_name: Path to second audio file :param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency :param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw) :param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius :param testpointsnum: number of points tested between referential points :param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same :param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path :param showplot: if set to true, function will plot the results :return: returns true if two input audio files are the same, returns false otherwise """ audio1, _ = _ffmpeg_load_audio(audiofile1_name, sr = fs, mono = True) audio2, _ = _ffmpeg_load_audio(audiofile2_name, sr = fs, mono = True) if (chroma_method == "cuda"): chroma1 = _calculate_chromagram_cuda(audio1) chroma2 = _calculate_chromagram_cuda(audio2) elif (chroma_method == "synctoolbox"): chroma1 = _calculate_chromagram_synctoolbox(audio1) chroma2 = _calculate_chromagram_synctoolbox(audio2) print("Checking whether files \"" + os.path.basename(audiofile1_name) + "\" and \"" + os.path.basename(audiofile2_name) + "\" are duplicates:") isDuplicate = is_chroma_duplicate(chroma1, chroma2, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider, showplot) print(isDuplicate) return isDuplicate # Helper functions ---------------------------------------------------------------------------------------------- # function for rewriting cells in .csv def _csvaddtocell(csvdir, row, column, value): f = open(csvdir, 'r', encoding = "utf-8") reader = csv.reader(f) mylist = list(reader) f.close() if(len(mylist[row][column]) == 0): mylist[row][column] = str(value) else: mylist[row][column] = str(mylist[row][column]) + ", " + str(value) mylistnew = open(csvdir, 'w', newline='', encoding="utf-8") csv_writer = csv.writer(mylistnew) csv_writer.writerows(mylist) mylistnew.close() # function that appends list to .csv def _append_row_csv(csvdir, list): with open(csvdir, 'a', newline='') as f_object: # Pass the CSV file object to the writer() function writer_object = writer(f_object) # Result - a writer object # Pass the data in the list as an argument into the writerow() function writer_object.writerow(list) # Close the file object f_object.close() # function that exports csv data to xlsx def convert_csv_to_xlsx(csvfile, xlsxfile): wb = openpyxl.Workbook() ws = wb.active with open(csvfile, 'r', encoding = "utf-8") as f: for row in csv.reader(f): ws.append(row) wb.save(xlsxfile) # function for audio file loading using FFMPEG def _ffmpeg_load_audio(filename, sr=44100, mono=False, normalize=True, in_type=np.int16, out_type=np.float32): channels = 1 if mono else 2 format_strings = { np.float64: 'f64le', np.float32: 'f32le', np.int16: 's16le', np.int32: 's32le', np.uint32: 'u32le' } format_string = format_strings[in_type] command = [ 'ffmpeg', '-i', filename, '-f', format_string, '-acodec', 'pcm_' + format_string, '-ar', str(sr), '-ac', str(channels), '-'] p = sp.Popen(command, stdout=sp.PIPE, stderr=DEVNULL, bufsize=4096, shell=True) bytes_per_sample = np.dtype(in_type).itemsize frame_size = bytes_per_sample * channels chunk_size = frame_size * sr # read in 1-second chunks raw = b'' with p.stdout as stdout: while True: data = stdout.read(chunk_size) if data: raw += data else: break audio = np.fromstring(raw, dtype=in_type).astype(out_type) if channels > 1: audio = audio.reshape((-1, channels)).transpose() if audio.size == 0: return audio, sr if issubclass(out_type, np.floating): if normalize: peak = np.abs(audio).max() if peak > 0: audio /= peak elif issubclass(in_type, np.integer): audio /= np.iinfo(in_type).max return audio, sr # function for output files creating def _create_output_files(duplicatepairslist, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only): numofduplicatepairs = len(duplicatepairslist) if os.path.exists(csvfiledir_duplicates_only): os.remove(csvfiledir_duplicates_only) # writes header to csv file containing only list of duplicates _append_row_csv(csvfiledir_duplicates_only, ["File 1 directory", "File 1 name", "File 2 directory", "File 2 name"]) # finds i and j coordinates from duplicatepairslist containing all the files for duplicatepair in duplicatepairslist: file1name = os.path.basename(duplicatepair[0]) # returns only filename with extension file2name = os.path.basename(duplicatepair[1]) i = filedirs_all.index(duplicatepair[0]) j = filedirs_all.index(duplicatepair[1]) # writes to csv file _csvaddtocell(csvfiledir, i+1, 3, file2name) _csvaddtocell(csvfiledir, i+1, 4, j) _csvaddtocell(csvfiledir, j+1, 3, file1name) _csvaddtocell(csvfiledir, j+1, 4, i) # writes to csv file duplicates only _append_row_csv(csvfiledir_duplicates_only, [duplicatepair[0], file1name, duplicatepair[1], file2name]) # converts csvs to excel files convert_csv_to_xlsx(csvfiledir, excelfiledir) convert_csv_to_xlsx(csvfiledir_duplicates_only, excelfiledir_duplicates_only) # function that returns nearest value to the input value from array def _find_nearest(array, value): index = np.abs(array - value).argmin() return array.flat[index] # chroma features calculation functions -------------------------------------------------------------------------- # function for chroma features calculation using CUDA and nnAudio def _calculate_chromagram_cuda(audio): # initializes spectrogram layer device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") global spec_layer if (spec_layer == None): if (torch.cuda.is_available()): spec_layer = features.CQT(sr=fs, hop_length=512).cuda() else: spec_layer = features.CQT(sr=fs, hop_length=512).cpu() # creates cqt spektrogramu using nnaudio, to parse into librosa audio = torch.tensor(audio, device=device).float() # casting the array into a PyTorch Tensor cqt = spec_layer(audio) cqt = cqt.cpu().detach().numpy()[0] # calculates chromagram chroma = librosa.feature.chroma_cqt(C=cqt, sr=fs, hop_length=512) return chroma # function for chroma features calculation using synctoolbox def _calculate_chromagram_synctoolbox(audio): f_pitch = audio_to_pitch_features(audio, Fs = fs) f_chroma = pitch_to_chroma(f_pitch=f_pitch) f_chroma_quantized = quantize_chroma(f_chroma=f_chroma) return f_chroma_quantized # function for chroma features calculation of all files from defined path def _calculate_chromagrams(path, outputpath, chroma_method): # loading of directory with subfolders audiofolderlist = [] # list only for folders with audio files (not containing chroma_files or DuplicateChecker) subfolders = [x[0] for x in os.walk(path)] for folder in subfolders: if not ( "chroma_files" in folder or "DuplicateFinder" in folder): # only folders which are not for duplicatechecker data if not (np.asarray(librosa.util.find_files(folder, ext=['mp3', 'mp4', 'ogg', 'wav'], recurse = False)).size == 0): # only folders containing audio files audiofolderlist.append(folder) # appends filedirs = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])) # list containing paths to all audiofiles (every subdirectory) filesnumber = filedirs.size # Initializes csv writer and output dir csvfiledir = os.path.join(outputpath, output_filename + ".csv") header = ['File ID', 'File directory', 'File name', 'Duplicate file names', 'Duplicate IDs'] csvfile = open(csvfiledir, 'w', encoding="utf-8", newline='') csvwriter = csv.writer(csvfile) csvwriter.writerow(header) currentfilenum = 0 # iterates through folders with audio data for folder in audiofolderlist: folderreldir = os.path.relpath(folder, path) chromapath = os.path.join(outputpath, "chroma_files", folderreldir) # Creates chroma output directory if it doesnt exist yet if (os.path.isdir(chromapath) == False): os.makedirs(chromapath) foldercurrentfilenum = 0 # variable for indexing, unique for each subdirectory folderfiledirs = np.asarray(librosa.util.find_files(folder, ext=['mp3', 'mp4', 'ogg', 'wav'], recurse = False)) # list containing audio file paths+names in current subfolder excelrow = 1 for audiofile in folderfiledirs: filename = os.path.basename(audiofile) filenamewithext = filename + ".npy" chromafilepath = os.path.join(chromapath, filenamewithext) filedir = folderfiledirs[foldercurrentfilenum] # Calculates chroma features of audio file if it hasnt been calculated yet (doesnt exist in chroma_files folder) if (os.path.exists(chromafilepath) == False): print("Extracting chroma features from file \"" + os.path.basename(audiofile) + "\" (" + str(currentfilenum+1) + "/" + str(filesnumber) + ")") wave, _ = _ffmpeg_load_audio(audiofile, sr=fs, mono=True) if (chroma_method == "cuda"): chroma = _calculate_chromagram_cuda(wave) elif (chroma_method == "synctoolbox"): chroma = _calculate_chromagram_synctoolbox(wave) print("\n") np.save(chromafilepath, chroma) else: print("Chroma features corresponding to file: \"" + os.path.basename(audiofile) + "\" have been loaded!" + " (" + str(currentfilenum+1) + "/" + str(filesnumber) + ")") csvwriter.writerow([currentfilenum, filedir, filename, "", ""]) foldercurrentfilenum = foldercurrentfilenum + 1 currentfilenum = currentfilenum + 1 csvfile.close() print("\nChroma features have been successfuly extracted from all audio files!\n") # function that evaluates DTW path flatness (whether two recordings are same or not) def _verify_path_flatness(pathx, pathy, testpointsnum, diffpointstolerance, segmentdivider): # determination of sample numbers for line approximation pathxminval = min(pathx) # determination of min and max values (start and beginning of line on x axis) pathxmaxval = max(pathx) pathxvalrange = pathxmaxval - pathxminval # makes sure that the range is divisible by the segmentdivider value modulo = pathxvalrange % segmentdivider pathxvalrange = pathxvalrange - modulo refpoint1xval = int(pathxminval+(pathxvalrange/segmentdivider)) # determination of point x value for approximation refpoint2xval = int(pathxminval+(pathxvalrange/segmentdivider)*(segmentdivider-1)) refpoint1xpos = int(np.argwhere(pathx==refpoint1xval)[0]) # finds out positions of array pathx at which these points are located refpoint2xpos = int(np.argwhere(pathx==refpoint2xval)[0]) refpointsx = np.array([pathx[refpoint1xpos], pathx[refpoint2xpos]]) # creates arrays with x and y coordinates in format suitable for np.polyfit refpointsy = np.array([pathy[refpoint1xpos], pathy[refpoint2xpos]]) # Line approximation -------------------------------- coefficients = np.polyfit(refpointsx, refpointsy, 1) polynomial = np.poly1d(coefficients) linex = np.arange(start=0, stop=len(pathx), step=1) liney = polynomial(linex) # verifies whether the path between the two points used to approximate the curve actually lies on the curve refpointsvaldiff = refpoint2xval - refpoint1xval if (testpointsnum > refpointsvaldiff): # ensures that the number of test points does not exceed the number of defined points between the reference points except the reference points themselves testpointsnum = refpointsvaldiff - 1 testpointstep = refpointsvaldiff / testpointsnum # step size # Testing of points --------------------------------- testpointxshift = testpointstep/2 # variable that ensures that the first test point is not at the point where the curve intersects path diffpointsnum = 0 # a variable to which one is added in the cycle iteration if the point values ​​do not fit for i in range(0, testpointsnum, 1): # cycle iterating across individual testing points testpointxval = refpoint1xval + i*testpointstep + testpointxshift # finding the value for testing testpointnearestxval = _find_nearest(pathx, testpointxval) # finding the nearest value to the test value testpointxpos = np.argwhere(pathx==testpointnearestxval)[0] # finding the first index of the path element, which is equal to the given test position (index of pathx does not always match the value !!) pathval = float(pathy[testpointxpos]) # finding the value that matches the index found in the previous step lineval = float(liney[int(testpointnearestxval)]) # finding the value of the tested position at the approximation curve (here the index is always equal to the value) if (abs(pathval-lineval)>1.2): # if the difference between the values ​​is greater than 1.2, it gets written to diffpointsnum diffpointsnum = diffpointsnum+1 plt.plot(testpointnearestxval, pathval, '+', markersize=12, color='red') else: plt.plot(testpointnearestxval, pathval, '+', markersize=12, color='green') diffpointsnumtolerance = round((diffpointstolerance/100) * testpointsnum) # Evaluation, if two recordings are the same --------- if (diffpointsnum <= diffpointsnumtolerance): issame = True else: issame = False # PLOTTING plt.plot(pathx, pathy, color="black") plt.plot(linex[refpoint1xval:refpoint2xval+1], liney[refpoint1xval:refpoint2xval+1]) plt.plot(refpointsx, refpointsy, 'o', markersize=8, color='blue') return issame, plt # chromagram to image functions ---------------------------------------------------------------------------------- def _load_chromafile_and_save_as_img(chroma_filename, output_filename): chroma = np.load(chroma_filename) chromaprocessed = (chroma * 256).astype(np.uint8) # get into right scale im = Image.fromarray(chromaprocessed) if im.mode != 'RGB': im = im.convert('RGB') im.save(output_filename) # save # function that takes folder of chromagrams as input and exports them as bitmap into defined output folder def _export_chromafiles_as_imgs(chroma_folder, output_folder): # creates directory if needed if not os.path.exists(output_folder): os.makedirs(output_folder) # lists all chroma files in all subdirectories of chroma_folder chromalist = list() for (dirpath, dirnames, filenames) in os.walk(chroma_folder): chromalist += [os.path.join(dirpath, file) for file in filenames] print("Saving chroma features as bitmaps for image hashing") for file in chromalist: chromadir = os.path.realpath(file) #input chroma dir chromareldir = os.path.relpath(chromadir, chroma_folder) chromaimgreldir = chromareldir.replace('.npy', '') + ".png" chromaoutputdir = os.path.join(output_folder, chromaimgreldir) # creates directory if it doesnt exist yet directory = os.path.dirname(chromaoutputdir) if os.path.isdir(directory) == False: os.makedirs(directory) if (os.path.exists(chromaoutputdir) == False): _load_chromafile_and_save_as_img(chromadir, chromaoutputdir) # function that evaluates, whether two images are similar according to set hash difference treshold def _are_imgs_similar(file1, file2, hashdiff_tresh): hash1 = imagehash.phash(Image.open(file1)) hash2 = imagehash.phash(Image.open(file2)) hashdiff = abs(hash1 - hash2) if (hashdiff <= hashdiff_tresh): return True else: return False # duplicate finding helper functions ----------------------------------------------------------------------------- # function that returns list of found duplicates in defined path using DTW method def _return_duplicates_dtw(path, outputpath, chroma_method = "cuda", dtwtype ="mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4, filepairslist = None): filenames=[] for root, dirs, files in os.walk(path): for file in files: if file.endswith(tuple(['mp3', 'mp4', 'ogg', 'wav'])): filenames.append(file) filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])) # list containing paths to all audiofiles (every subdirectory) filesnumber = filedirs_all.size currentpairnum = 1 # if filepairslist to test is not defined from function argument, it gets set to all possible combinations if (filepairslist == None): filepairslist = [] for i in range(0, filesnumber): for j in range(i+1, filesnumber): filepairslist.append([filedirs_all[i], filedirs_all[j]]) numofpairs = len(filepairslist) filedirs_all = filedirs_all.tolist() duplicatepairslist = [] #Checking every pair if it is a duplicate for filepair in filepairslist: file1dir = str(filepair[0]) file2dir = str(filepair[1]) file1reldir = os.path.dirname(os.path.relpath(file1dir, path)) # returns relative directory to file (removes c:\\ and path (project folder)) file2reldir = os.path.dirname(os.path.relpath(file2dir, path)) file1name = os.path.basename(filepair[0]) # returns only filename with extension file2name = os.path.basename(filepair[1]) file1namenpy = file1name + ".npy" file2namenpy = file2name + ".npy" chroma1dir = os.path.join(outputpath, "chroma_files", file1reldir, file1namenpy) chroma2dir = os.path.join(outputpath, "chroma_files", file2reldir, file2namenpy) print("Using DTW to check whether files \"" + file1name + "\" and \"" + file2name + "\" are duplicates" + " (" + str(currentpairnum) + "/" + str(numofpairs) + ")") chroma1 = np.load(chroma1dir) chroma2 = np.load(chroma2dir) isduplicate = is_chroma_duplicate(chroma1, chroma2, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider) print(isduplicate) if (isduplicate): duplicatepairslist.append([file1dir, file2dir]) currentpairnum = currentpairnum + 1 return duplicatepairslist # function that returns list of found duplicates in defined path using image hashing def _return_duplicates_img_hashing(path, outputpath, hashdiff_tresh = 10): filenames=[] for root, dirs, files in os.walk(path): for file in files: if file.endswith(tuple(['mp3', 'mp4', 'ogg', 'wav'])): filenames.append(file) filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])) # list containing paths to all audiofiles (every subdirectory) filesnumber = filedirs_all.size currentpairnum = 1 filepairslist = [] for i in range(0, filesnumber): for j in range(i+1, filesnumber): filepairslist.append([filedirs_all[i], filedirs_all[j]]) numofpairs = len(filepairslist) filedirs_all = filedirs_all.tolist() duplicatepairslist = [] #Checking every pair if it is a duplicate for filepair in filepairslist: file1dir = str(filepair[0]) file2dir = str(filepair[1]) file1reldir = os.path.dirname(os.path.relpath(file1dir, path)) # returns relative directory to file (removes c:\\ and path (project folder)) file2reldir = os.path.dirname(os.path.relpath(file2dir, path)) file1name = os.path.basename(filepair[0]) # returns only filename with extension file2name = os.path.basename(filepair[1]) file1namepng = file1name + ".png" file2namepng = file2name + ".png" chromaimg1dir = os.path.join(outputpath, "chroma_files_imgs", file1reldir, file1namepng) chromaimg2dir = os.path.join(outputpath, "chroma_files_imgs", file2reldir, file2namepng) print("Using image hashing to check whether files \"" + file1name + "\" and \"" + file2name + "\" are similar" + " (" + str(currentpairnum) + "/" + str(numofpairs) + ")") isduplicate = _are_imgs_similar(chromaimg1dir, chromaimg2dir, hashdiff_tresh) print(isduplicate) if (isduplicate): duplicatepairslist.append([file1dir, file2dir]) currentpairnum = currentpairnum + 1 return duplicatepairslist