mapletree/lxc2/mediaservices/app/duplicate_finder.py
2026-01-29 13:37:11 -07:00

719 lines
38 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import csv
import openpyxl
import librosa
from matplotlib import pyplot as plt
from nnAudio import features
import time
import os.path
from PIL import Image
import imagehash
from dtw import dtw
from scipy.spatial.distance import euclidean
from synctoolbox.dtw import mrmsdtw
from fastdtw import fastdtw
import torch
import numpy as np
import subprocess as sp
from csv import writer
import os
from synctoolbox.feature.pitch import audio_to_pitch_features
from synctoolbox.feature.chroma import pitch_to_chroma, quantize_chroma, quantized_chroma_to_CENS
DEVNULL = open(os.devnull, 'w')
# Global settings variables -------------------------------------------------------------------------------------
fs = 11025
output_filename = "DuplicateFinder_results"
output_filename_duplicates_only = "DuplicateFinder_results_duplicates_only"
# Global variables for functions --------------------------------------------------------------------------------
spec_layer = None
# Functions for audio duplicates finding ------------------------------------------------------------------------
def find_duplicates(path, outputpath = None, accuracy = "normal", chroma_method= "cuda"):
"""
Explanation: Simplest function to run duplicate finding algorithm using four different accuracy settings
:param path: Directory of folder in which to look for duplicates
:param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder
:param accuracy: Calculation accuracy, can be set to "low", "normal", "high" and "extreme"; "low" for finding duplicates which are exactly identical, "normal" for cases where there may be some sort of noise in the beggining/end of one duplicate or for case when one duplicate is encoded into very low bitrate, "high" is similar as normal, but has even lower tolerance for differences, "extreme" can be used for cases when user expects very long passages (like half of the whole recording) of noise in beggining/end of some of the audio duplicates
:param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency
"""
if accuracy == "low":
find_duplicates_img_hashing(path, outputpath, chroma_method, hashdiff_tresh = 0)
elif accuracy == "normal":
find_duplicates_combined(path, outputpath, chroma_method, hashdiff_tresh = 10, dtwarea = 1000000, verify_extremes = False)
elif accuracy == "high":
find_duplicates_combined(path, outputpath, chroma_method, hashdiff_tresh = 20, dtwarea = 1000000, verify_extremes = False)
elif accuracy == "extreme":
find_duplicates_dtw(path, outputpath, dtwarea = 10000000, verify_extremes = True)
def find_duplicates_dtw(path, outputpath = None, chroma_method = "cuda", dtwtype ="mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4):
"""
Explanation: Function that iterates through user-defined audio files directory to find duplicates using DTW method, writes output to .csv and .xlsx files
:param path: Directory of folder in which to look for duplicates
:param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder
:param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency
:param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw)
:param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius
:param verify_extremes: Sets whether path evaluation is done for both orientations of the axis; set to true, if you expect very long passages of silence, applause etc. in beginning of one of the recordings
:param testpointsnum: number of points tested between referential points
:param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same
:param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path
"""
filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])).tolist() # list containing paths to all audiofiles (every subdirectory)
if (outputpath == None): # if outputpath is not defined, it gets created inside path directory
outputpath = os.path.join(path, "DuplicateFinder")
if (os.path.isdir(outputpath) == False): # create data output directory if doesnt exist yet
os.mkdir(outputpath)
csvfiledir = os.path.join(outputpath, output_filename + ".csv")
excelfiledir = os.path.join(outputpath, output_filename + ".xlsx")
csvfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".csv")
excelfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".xlsx")
# starts the time counter
tcalcstart = time.time()
# chromagrams calculation
_calculate_chromagrams(path, outputpath, chroma_method)
# DTW calculation (returns list of duplicates)
duplicatepairslist = _return_duplicates_dtw(path, outputpath, chroma_method, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider)
# writes to output files
_create_output_files(duplicatepairslist, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only)
# stops the timer and writes output to console
tcalcend = time.time()
calctime = round(tcalcend - tcalcstart, 2)
numofduplicatepairs = len(duplicatepairslist)
print("\nCalculation finished!")
print("Total calculation time: " + str(calctime) + " s")
print("Number of duplicate pairs found: " + str(numofduplicatepairs))
def find_duplicates_img_hashing(path, outputpath = None, chroma_method = "cuda", hashdiff_tresh = 10):
"""
Explanation: Function that iterates through user-defined audio files directory to find duplicates using image hashing method, writes output to .csv and .xlsx files
:param path: Directory of folder in which to look for duplicates
:param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder
:param method: "cuda" for chroma features calculation using CUDA and nnAudio, "synctoolbox" for using same-named library
:param hashdiff_tresh: Treshold of hash difference, for which two recordings are evaluated as same
"""
filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])).tolist() # list containing paths to all audiofiles (every subdirectory)
if (outputpath == None): # if outputpath is not defined, it gets created inside path directory
outputpath = os.path.join(path, "DuplicateFinder")
if (os.path.isdir(outputpath) == False): # create data output directory if doesnt exist yet
os.mkdir(outputpath)
csvfiledir = os.path.join(outputpath, output_filename + ".csv")
excelfiledir = os.path.join(outputpath, output_filename + ".xlsx")
csvfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".csv")
excelfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".xlsx")
chroma_folder = os.path.join(outputpath, "chroma_files")
chroma_imgs_folder = os.path.join(outputpath, "chroma_files_imgs")
# starts the time counter
tcalcstart = time.time()
# chroma features calculation
_calculate_chromagrams(path, outputpath, chroma_method)
# exports chroma features as images
_export_chromafiles_as_imgs(chroma_folder, chroma_imgs_folder)
# duplicates calculation
pairslist = _return_duplicates_img_hashing(path, outputpath, hashdiff_tresh)
# stops the timer and writes output to console
tcalcend = time.time()
calctime = round(tcalcend - tcalcstart, 2)
numofduplicatepairs = len(pairslist)
print("\nCalculation finished!")
print("Total calculation time: " + str(calctime) + " s")
print("Number of duplicate pairs found: " + str(numofduplicatepairs))
# writing results to output file
_create_output_files(pairslist, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only)
def find_duplicates_combined(path, outputpath = None, chroma_method = "cuda", hashdiff_tresh = 10, dtwtype ="mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4):
"""
Explanation: Function that iterates through user-defined audio files directory to find duplicates using image hashing first to check which pairs might be similar, and then evaluating these found pairs using DTW method
:param path: Directory of folder in which to look for duplicates
:param outputpath: Output directory for chroma features and final results; if not set, folder gets automatically created inside the path folder
:param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency
:param hashdiff_tresh: Treshold of hash difference, for which two recordings are evaluated as same
:param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw)
:param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius
:param verify_extremes: Sets whether path evaluation is done for both orientations of the axis; set to true, if you expect very long passages of silence, applause etc. in beginning of one of the recordings
:param testpointsnum: number of points tested between referential points
:param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same
:param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path
"""
filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])).tolist() # list containing paths to all audiofiles (every subdirectory)
if (outputpath == None): # if outputpath is not defined, it gets created inside path directory
outputpath = os.path.join(path, "DuplicateFinder")
if (os.path.isdir(outputpath) == False): # create data output directory if doesnt exist yet
os.mkdir(outputpath)
csvfiledir = os.path.join(outputpath, output_filename + ".csv")
excelfiledir = os.path.join(outputpath, output_filename + ".xlsx")
csvfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".csv")
excelfiledir_duplicates_only = os.path.join(outputpath, output_filename_duplicates_only + ".xlsx")
chroma_folder = os.path.join(outputpath, "chroma_files")
chroma_imgs_folder = os.path.join(outputpath, "chroma_files_imgs")
# starts the time counter
tcalcstart = time.time()
# chromagrams calculation
_calculate_chromagrams(path, outputpath, chroma_method)
# exports chroma features as images
_export_chromafiles_as_imgs(chroma_folder, chroma_imgs_folder)
# image hashing duplicates pre-calculation
pairslistimghashing = _return_duplicates_img_hashing(path, outputpath, hashdiff_tresh)
# DTW calculation of only pairs pre-calculated by image hashing
pairslistfinal = _return_duplicates_dtw(path, outputpath, chroma_method, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider, pairslistimghashing)
# writes to output files
_create_output_files(pairslistfinal, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only)
# stops the timer and writes output to console
tcalcend = time.time()
calctime = round(tcalcend - tcalcstart, 2)
numofduplicatepairs = len(pairslistfinal)
print("\nCalculation finished!")
print("Total calculation time: " + str(calctime) + " s")
print("Number of duplicate pairs found: " + str(numofduplicatepairs))
def is_chroma_duplicate(chroma1, chroma2, dtwtype = "mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4, showplot=False):
"""
Explanation: Checks if two chromagrams corresponding to recordings are duplicates or not
:param chroma1: Chromagram of first recording
:param chroma2: Chromagram of second recording
:param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw)
:param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius
:param verify_extremes: Sets whether path evaluation is done for both orientations of the axis; set to true, if you expect very long passages of silence, applause etc. in beginning of one of the recordings
:param testpointsnum: number of points tested between referential points
:param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same
:param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path
:param showplot: if set to true, function will plot the results
:return: returns true if two input chromagrams are the same, returns false otherwise
"""
# DTW ---------------------------
if dtwtype == "mrmsdtw":
path = mrmsdtw.sync_via_mrmsdtw(chroma1, chroma2, dtw_implementation="librosa", threshold_rec=dtwarea)
pathx = np.array(path[0,:]) # rozdeleni cesty do dvou np arrays
pathy = np.array(path[1,:])
elif dtwtype == "fastdtw":
# flipnuti os chroma vektoru
chroma1 = np.swapaxes(chroma1, 0, 1)
chroma2 = np.swapaxes(chroma2, 0, 1)
distance, path = fastdtw(x = chroma1, y = chroma2, dist = euclidean, radius = dtwarea)
pathx, pathy = zip(*path[::-1]) # reverse osy aby slo vzestupne a rozdeleni do dvou samostatnych arrays
pathx = np.array(pathx) # prevedeni na datovy typ array
pathy = np.array(pathy)
elif dtwtype == "dtw":
chroma1 = np.swapaxes(chroma1, 0, 1)
chroma2 = np.swapaxes(chroma2, 0, 1)
path = dtw(chroma1, chroma2, dist = euclidean)
pathx = path[3][0] # rozdeleni cesty do dvou np arrays
pathy = path[3][1]
else:
print("Wrong dtwtype input argument!")
quit()
issame, plt = _verify_path_flatness(pathx, pathy, testpointsnum = testpointsnum, diffpointstolerance = diffpointstolerance, segmentdivider = segmentdivider)
if (verify_extremes):
if (issame == False): # if the system returns that files are not duplicates, it flips the axes and verifies in this order aswell (which can help if there is for example very long passage of noise in one of the audio files)
plt.clf()
issame, plt = _verify_path_flatness(pathy, pathx, testpointsnum = testpointsnum, diffpointstolerance = diffpointstolerance, segmentdivider = segmentdivider)
if (showplot == True):
plt.show()
return issame
def is_duplicate(audiofile1_name, audiofile2_name, chroma_method = "cuda", dtwtype = "mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4, showplot=False):
"""
Explanation: Checks if two audio files are duplicates or not
:param audiofile1_name: Path to first audio file
:param audiofile2_name: Path to second audio file
:param chroma_method: Method for chroma features calculation, "cuda" is for nnAudio implementation with the use of CUDA, "synctoolbox" uses functions from same-named library without CUDA - dependency
:param dtwtype: Type of used DTW method (can be: mrmsdtw, dtw, fastdtw)
:param dtwarea: For setting unique DTW method parameter, depends on used dtwtype - for mrmsdtw this parameter sets tau, using fastdtw it is defining radius
:param testpointsnum: number of points tested between referential points
:param diffpointstolerance: determines percentage of tested points whose value may be different, for which the system still evaluates the recordings as the same
:param segmentdivider: determines from which points is the referential line counted - for example, if segmentdivider = 4, testing line will start from 1/4 and end in 3/4 range of the warping path
:param showplot: if set to true, function will plot the results
:return: returns true if two input audio files are the same, returns false otherwise
"""
audio1, _ = _ffmpeg_load_audio(audiofile1_name, sr = fs, mono = True)
audio2, _ = _ffmpeg_load_audio(audiofile2_name, sr = fs, mono = True)
if (chroma_method == "cuda"):
chroma1 = _calculate_chromagram_cuda(audio1)
chroma2 = _calculate_chromagram_cuda(audio2)
elif (chroma_method == "synctoolbox"):
chroma1 = _calculate_chromagram_synctoolbox(audio1)
chroma2 = _calculate_chromagram_synctoolbox(audio2)
print("Checking whether files \"" + os.path.basename(audiofile1_name) + "\" and \"" + os.path.basename(audiofile2_name) + "\" are duplicates:")
isDuplicate = is_chroma_duplicate(chroma1, chroma2, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider, showplot)
print(isDuplicate)
return isDuplicate
# Helper functions ----------------------------------------------------------------------------------------------
# function for rewriting cells in .csv
def _csvaddtocell(csvdir, row, column, value):
f = open(csvdir, 'r', encoding = "utf-8")
reader = csv.reader(f)
mylist = list(reader)
f.close()
if(len(mylist[row][column]) == 0):
mylist[row][column] = str(value)
else:
mylist[row][column] = str(mylist[row][column]) + ", " + str(value)
mylistnew = open(csvdir, 'w', newline='', encoding="utf-8")
csv_writer = csv.writer(mylistnew)
csv_writer.writerows(mylist)
mylistnew.close()
# function that appends list to .csv
def _append_row_csv(csvdir, list):
with open(csvdir, 'a', newline='') as f_object:
# Pass the CSV file object to the writer() function
writer_object = writer(f_object)
# Result - a writer object
# Pass the data in the list as an argument into the writerow() function
writer_object.writerow(list)
# Close the file object
f_object.close()
# function that exports csv data to xlsx
def convert_csv_to_xlsx(csvfile, xlsxfile):
wb = openpyxl.Workbook()
ws = wb.active
with open(csvfile, 'r', encoding = "utf-8") as f:
for row in csv.reader(f):
ws.append(row)
wb.save(xlsxfile)
# function for audio file loading using FFMPEG
def _ffmpeg_load_audio(filename, sr=44100, mono=False, normalize=True, in_type=np.int16, out_type=np.float32):
channels = 1 if mono else 2
format_strings = {
np.float64: 'f64le',
np.float32: 'f32le',
np.int16: 's16le',
np.int32: 's32le',
np.uint32: 'u32le'
}
format_string = format_strings[in_type]
command = [
'ffmpeg',
'-i', filename,
'-f', format_string,
'-acodec', 'pcm_' + format_string,
'-ar', str(sr),
'-ac', str(channels),
'-']
p = sp.Popen(command, stdout=sp.PIPE, stderr=DEVNULL, bufsize=4096, shell=True)
bytes_per_sample = np.dtype(in_type).itemsize
frame_size = bytes_per_sample * channels
chunk_size = frame_size * sr # read in 1-second chunks
raw = b''
with p.stdout as stdout:
while True:
data = stdout.read(chunk_size)
if data:
raw += data
else:
break
audio = np.fromstring(raw, dtype=in_type).astype(out_type)
if channels > 1:
audio = audio.reshape((-1, channels)).transpose()
if audio.size == 0:
return audio, sr
if issubclass(out_type, np.floating):
if normalize:
peak = np.abs(audio).max()
if peak > 0:
audio /= peak
elif issubclass(in_type, np.integer):
audio /= np.iinfo(in_type).max
return audio, sr
# function for output files creating
def _create_output_files(duplicatepairslist, filedirs_all, csvfiledir, excelfiledir, csvfiledir_duplicates_only, excelfiledir_duplicates_only):
numofduplicatepairs = len(duplicatepairslist)
if os.path.exists(csvfiledir_duplicates_only):
os.remove(csvfiledir_duplicates_only)
# writes header to csv file containing only list of duplicates
_append_row_csv(csvfiledir_duplicates_only, ["File 1 directory", "File 1 name", "File 2 directory", "File 2 name"])
# finds i and j coordinates from duplicatepairslist containing all the files
for duplicatepair in duplicatepairslist:
file1name = os.path.basename(duplicatepair[0]) # returns only filename with extension
file2name = os.path.basename(duplicatepair[1])
i = filedirs_all.index(duplicatepair[0])
j = filedirs_all.index(duplicatepair[1])
# writes to csv file
_csvaddtocell(csvfiledir, i+1, 3, file2name)
_csvaddtocell(csvfiledir, i+1, 4, j)
_csvaddtocell(csvfiledir, j+1, 3, file1name)
_csvaddtocell(csvfiledir, j+1, 4, i)
# writes to csv file duplicates only
_append_row_csv(csvfiledir_duplicates_only, [duplicatepair[0], file1name, duplicatepair[1], file2name])
# converts csvs to excel files
convert_csv_to_xlsx(csvfiledir, excelfiledir)
convert_csv_to_xlsx(csvfiledir_duplicates_only, excelfiledir_duplicates_only)
# function that returns nearest value to the input value from array
def _find_nearest(array, value):
index = np.abs(array - value).argmin()
return array.flat[index]
# chroma features calculation functions --------------------------------------------------------------------------
# function for chroma features calculation using CUDA and nnAudio
def _calculate_chromagram_cuda(audio):
# initializes spectrogram layer
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
global spec_layer
if (spec_layer == None):
if (torch.cuda.is_available()):
spec_layer = features.CQT(sr=fs, hop_length=512).cuda()
else:
spec_layer = features.CQT(sr=fs, hop_length=512).cpu()
# creates cqt spektrogramu using nnaudio, to parse into librosa
audio = torch.tensor(audio, device=device).float() # casting the array into a PyTorch Tensor
cqt = spec_layer(audio)
cqt = cqt.cpu().detach().numpy()[0]
# calculates chromagram
chroma = librosa.feature.chroma_cqt(C=cqt, sr=fs, hop_length=512)
return chroma
# function for chroma features calculation using synctoolbox
def _calculate_chromagram_synctoolbox(audio):
f_pitch = audio_to_pitch_features(audio, Fs = fs)
f_chroma = pitch_to_chroma(f_pitch=f_pitch)
f_chroma_quantized = quantize_chroma(f_chroma=f_chroma)
return f_chroma_quantized
# function for chroma features calculation of all files from defined path
def _calculate_chromagrams(path, outputpath, chroma_method):
# loading of directory with subfolders
audiofolderlist = [] # list only for folders with audio files (not containing chroma_files or DuplicateChecker)
subfolders = [x[0] for x in os.walk(path)]
for folder in subfolders:
if not ( "chroma_files" in folder or "DuplicateFinder" in folder): # only folders which are not for duplicatechecker data
if not (np.asarray(librosa.util.find_files(folder, ext=['mp3', 'mp4', 'ogg', 'wav'], recurse = False)).size == 0): # only folders containing audio files
audiofolderlist.append(folder) # appends
filedirs = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])) # list containing paths to all audiofiles (every subdirectory)
filesnumber = filedirs.size
# Initializes csv writer and output dir
csvfiledir = os.path.join(outputpath, output_filename + ".csv")
header = ['File ID', 'File directory', 'File name', 'Duplicate file names', 'Duplicate IDs']
csvfile = open(csvfiledir, 'w', encoding="utf-8", newline='')
csvwriter = csv.writer(csvfile)
csvwriter.writerow(header)
currentfilenum = 0
# iterates through folders with audio data
for folder in audiofolderlist:
folderreldir = os.path.relpath(folder, path)
chromapath = os.path.join(outputpath, "chroma_files", folderreldir)
# Creates chroma output directory if it doesnt exist yet
if (os.path.isdir(chromapath) == False):
os.makedirs(chromapath)
foldercurrentfilenum = 0 # variable for indexing, unique for each subdirectory
folderfiledirs = np.asarray(librosa.util.find_files(folder, ext=['mp3', 'mp4', 'ogg', 'wav'], recurse = False)) # list containing audio file paths+names in current subfolder
excelrow = 1
for audiofile in folderfiledirs:
filename = os.path.basename(audiofile)
filenamewithext = filename + ".npy"
chromafilepath = os.path.join(chromapath, filenamewithext)
filedir = folderfiledirs[foldercurrentfilenum]
# Calculates chroma features of audio file if it hasnt been calculated yet (doesnt exist in chroma_files folder)
if (os.path.exists(chromafilepath) == False):
print("Extracting chroma features from file \"" + os.path.basename(audiofile) + "\" (" + str(currentfilenum+1) + "/" + str(filesnumber) + ")")
wave, _ = _ffmpeg_load_audio(audiofile, sr=fs, mono=True)
if (chroma_method == "cuda"):
chroma = _calculate_chromagram_cuda(wave)
elif (chroma_method == "synctoolbox"):
chroma = _calculate_chromagram_synctoolbox(wave)
print("\n")
np.save(chromafilepath, chroma)
else:
print("Chroma features corresponding to file: \"" + os.path.basename(audiofile) + "\" have been loaded!" + " (" + str(currentfilenum+1) + "/" + str(filesnumber) + ")")
csvwriter.writerow([currentfilenum, filedir, filename, "", ""])
foldercurrentfilenum = foldercurrentfilenum + 1
currentfilenum = currentfilenum + 1
csvfile.close()
print("\nChroma features have been successfuly extracted from all audio files!\n")
# function that evaluates DTW path flatness (whether two recordings are same or not)
def _verify_path_flatness(pathx, pathy, testpointsnum, diffpointstolerance, segmentdivider):
# determination of sample numbers for line approximation
pathxminval = min(pathx) # determination of min and max values (start and beginning of line on x axis)
pathxmaxval = max(pathx)
pathxvalrange = pathxmaxval - pathxminval
# makes sure that the range is divisible by the segmentdivider value
modulo = pathxvalrange % segmentdivider
pathxvalrange = pathxvalrange - modulo
refpoint1xval = int(pathxminval+(pathxvalrange/segmentdivider)) # determination of point x value for approximation
refpoint2xval = int(pathxminval+(pathxvalrange/segmentdivider)*(segmentdivider-1))
refpoint1xpos = int(np.argwhere(pathx==refpoint1xval)[0]) # finds out positions of array pathx at which these points are located
refpoint2xpos = int(np.argwhere(pathx==refpoint2xval)[0])
refpointsx = np.array([pathx[refpoint1xpos], pathx[refpoint2xpos]]) # creates arrays with x and y coordinates in format suitable for np.polyfit
refpointsy = np.array([pathy[refpoint1xpos], pathy[refpoint2xpos]])
# Line approximation --------------------------------
coefficients = np.polyfit(refpointsx, refpointsy, 1)
polynomial = np.poly1d(coefficients)
linex = np.arange(start=0, stop=len(pathx), step=1)
liney = polynomial(linex)
# verifies whether the path between the two points used to approximate the curve actually lies on the curve
refpointsvaldiff = refpoint2xval - refpoint1xval
if (testpointsnum > refpointsvaldiff): # ensures that the number of test points does not exceed the number of defined points between the reference points except the reference points themselves
testpointsnum = refpointsvaldiff - 1
testpointstep = refpointsvaldiff / testpointsnum # step size
# Testing of points ---------------------------------
testpointxshift = testpointstep/2 # variable that ensures that the first test point is not at the point where the curve intersects path
diffpointsnum = 0 # a variable to which one is added in the cycle iteration if the point values do not fit
for i in range(0, testpointsnum, 1): # cycle iterating across individual testing points
testpointxval = refpoint1xval + i*testpointstep + testpointxshift # finding the value for testing
testpointnearestxval = _find_nearest(pathx, testpointxval) # finding the nearest value to the test value
testpointxpos = np.argwhere(pathx==testpointnearestxval)[0] # finding the first index of the path element, which is equal to the given test position (index of pathx does not always match the value !!)
pathval = float(pathy[testpointxpos]) # finding the value that matches the index found in the previous step
lineval = float(liney[int(testpointnearestxval)]) # finding the value of the tested position at the approximation curve (here the index is always equal to the value)
if (abs(pathval-lineval)>1.2): # if the difference between the values is greater than 1.2, it gets written to diffpointsnum
diffpointsnum = diffpointsnum+1
plt.plot(testpointnearestxval, pathval, '+', markersize=12, color='red')
else:
plt.plot(testpointnearestxval, pathval, '+', markersize=12, color='green')
diffpointsnumtolerance = round((diffpointstolerance/100) * testpointsnum)
# Evaluation, if two recordings are the same ---------
if (diffpointsnum <= diffpointsnumtolerance):
issame = True
else:
issame = False
# PLOTTING
plt.plot(pathx, pathy, color="black")
plt.plot(linex[refpoint1xval:refpoint2xval+1], liney[refpoint1xval:refpoint2xval+1])
plt.plot(refpointsx, refpointsy, 'o', markersize=8, color='blue')
return issame, plt
# chromagram to image functions ----------------------------------------------------------------------------------
def _load_chromafile_and_save_as_img(chroma_filename, output_filename):
chroma = np.load(chroma_filename)
chromaprocessed = (chroma * 256).astype(np.uint8) # get into right scale
im = Image.fromarray(chromaprocessed)
if im.mode != 'RGB':
im = im.convert('RGB')
im.save(output_filename) # save
# function that takes folder of chromagrams as input and exports them as bitmap into defined output folder
def _export_chromafiles_as_imgs(chroma_folder, output_folder):
# creates directory if needed
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# lists all chroma files in all subdirectories of chroma_folder
chromalist = list()
for (dirpath, dirnames, filenames) in os.walk(chroma_folder):
chromalist += [os.path.join(dirpath, file) for file in filenames]
print("Saving chroma features as bitmaps for image hashing")
for file in chromalist:
chromadir = os.path.realpath(file) #input chroma dir
chromareldir = os.path.relpath(chromadir, chroma_folder)
chromaimgreldir = chromareldir.replace('.npy', '') + ".png"
chromaoutputdir = os.path.join(output_folder, chromaimgreldir)
# creates directory if it doesnt exist yet
directory = os.path.dirname(chromaoutputdir)
if os.path.isdir(directory) == False:
os.makedirs(directory)
if (os.path.exists(chromaoutputdir) == False):
_load_chromafile_and_save_as_img(chromadir, chromaoutputdir)
# function that evaluates, whether two images are similar according to set hash difference treshold
def _are_imgs_similar(file1, file2, hashdiff_tresh):
hash1 = imagehash.phash(Image.open(file1))
hash2 = imagehash.phash(Image.open(file2))
hashdiff = abs(hash1 - hash2)
if (hashdiff <= hashdiff_tresh):
return True
else:
return False
# duplicate finding helper functions -----------------------------------------------------------------------------
# function that returns list of found duplicates in defined path using DTW method
def _return_duplicates_dtw(path, outputpath, chroma_method = "cuda", dtwtype ="mrmsdtw", dtwarea = 1000000, verify_extremes = False, testpointsnum = 100, diffpointstolerance = 5, segmentdivider = 4, filepairslist = None):
filenames=[]
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(tuple(['mp3', 'mp4', 'ogg', 'wav'])):
filenames.append(file)
filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])) # list containing paths to all audiofiles (every subdirectory)
filesnumber = filedirs_all.size
currentpairnum = 1
# if filepairslist to test is not defined from function argument, it gets set to all possible combinations
if (filepairslist == None):
filepairslist = []
for i in range(0, filesnumber):
for j in range(i+1, filesnumber):
filepairslist.append([filedirs_all[i], filedirs_all[j]])
numofpairs = len(filepairslist)
filedirs_all = filedirs_all.tolist()
duplicatepairslist = []
#Checking every pair if it is a duplicate
for filepair in filepairslist:
file1dir = str(filepair[0])
file2dir = str(filepair[1])
file1reldir = os.path.dirname(os.path.relpath(file1dir, path)) # returns relative directory to file (removes c:\\ and path (project folder))
file2reldir = os.path.dirname(os.path.relpath(file2dir, path))
file1name = os.path.basename(filepair[0]) # returns only filename with extension
file2name = os.path.basename(filepair[1])
file1namenpy = file1name + ".npy"
file2namenpy = file2name + ".npy"
chroma1dir = os.path.join(outputpath, "chroma_files", file1reldir, file1namenpy)
chroma2dir = os.path.join(outputpath, "chroma_files", file2reldir, file2namenpy)
print("Using DTW to check whether files \"" + file1name + "\" and \"" + file2name + "\" are duplicates" + " (" + str(currentpairnum) + "/" + str(numofpairs) + ")")
chroma1 = np.load(chroma1dir)
chroma2 = np.load(chroma2dir)
isduplicate = is_chroma_duplicate(chroma1, chroma2, dtwtype, dtwarea, verify_extremes, testpointsnum, diffpointstolerance, segmentdivider)
print(isduplicate)
if (isduplicate):
duplicatepairslist.append([file1dir, file2dir])
currentpairnum = currentpairnum + 1
return duplicatepairslist
# function that returns list of found duplicates in defined path using image hashing
def _return_duplicates_img_hashing(path, outputpath, hashdiff_tresh = 10):
filenames=[]
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(tuple(['mp3', 'mp4', 'ogg', 'wav'])):
filenames.append(file)
filedirs_all = np.asarray(librosa.util.find_files(path, ext=['mp3', 'mp4', 'ogg', 'wav'])) # list containing paths to all audiofiles (every subdirectory)
filesnumber = filedirs_all.size
currentpairnum = 1
filepairslist = []
for i in range(0, filesnumber):
for j in range(i+1, filesnumber):
filepairslist.append([filedirs_all[i], filedirs_all[j]])
numofpairs = len(filepairslist)
filedirs_all = filedirs_all.tolist()
duplicatepairslist = []
#Checking every pair if it is a duplicate
for filepair in filepairslist:
file1dir = str(filepair[0])
file2dir = str(filepair[1])
file1reldir = os.path.dirname(os.path.relpath(file1dir, path)) # returns relative directory to file (removes c:\\ and path (project folder))
file2reldir = os.path.dirname(os.path.relpath(file2dir, path))
file1name = os.path.basename(filepair[0]) # returns only filename with extension
file2name = os.path.basename(filepair[1])
file1namepng = file1name + ".png"
file2namepng = file2name + ".png"
chromaimg1dir = os.path.join(outputpath, "chroma_files_imgs", file1reldir, file1namepng)
chromaimg2dir = os.path.join(outputpath, "chroma_files_imgs", file2reldir, file2namepng)
print("Using image hashing to check whether files \"" + file1name + "\" and \"" + file2name + "\" are similar" + " (" + str(currentpairnum) + "/" + str(numofpairs) + ")")
isduplicate = _are_imgs_similar(chromaimg1dir, chromaimg2dir, hashdiff_tresh)
print(isduplicate)
if (isduplicate):
duplicatepairslist.append([file1dir, file2dir])
currentpairnum = currentpairnum + 1
return duplicatepairslist