super messy fft based detector
This commit is contained in:
parent
5f6ba13bfc
commit
cf8a9dcfc1
140
scripts/selcal-fft.py
Executable file
140
scripts/selcal-fft.py
Executable file
@ -0,0 +1,140 @@
|
|||||||
|
#! /usr/bin/env python3
|
||||||
|
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy.io import wavfile
|
||||||
|
from scipy.fft import fft
|
||||||
|
from scipy.signal import butter, lfilter, decimate
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import sys
|
||||||
|
|
||||||
|
frequencies = np.array([10 ** ((n + 22) * 0.0225 + 2) for n in range(32)])
|
||||||
|
widths = np.array([6.25 * 10 ** (n * 0.0225) for n in range(32)])
|
||||||
|
widths = np.array([6.25 for n in range(32)])
|
||||||
|
|
||||||
|
def butter_bandpass(lowcut, highcut, fs, order=5):
|
||||||
|
nyquist = 0.5 * fs
|
||||||
|
low = lowcut / nyquist
|
||||||
|
high = highcut / nyquist
|
||||||
|
b, a = butter(order, [low, high], btype='band')
|
||||||
|
return b, a
|
||||||
|
|
||||||
|
def bandpass_filter(data, lowcut, highcut, fs, order=5):
|
||||||
|
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
|
||||||
|
y = lfilter(b, a, data)
|
||||||
|
return y
|
||||||
|
|
||||||
|
def butter_lowpass(cutoff, fs, order=5):
|
||||||
|
nyquist = 0.5 * fs
|
||||||
|
normal_cutoff = cutoff / nyquist
|
||||||
|
b, a = butter(order, normal_cutoff, btype='low', analog=False)
|
||||||
|
return b, a
|
||||||
|
|
||||||
|
def lowpass_filter(data, cutoff, fs, order=5):
|
||||||
|
b, a = butter_lowpass(cutoff, fs, order=order)
|
||||||
|
y = lfilter(b, a, data)
|
||||||
|
return y
|
||||||
|
|
||||||
|
def get_largest_two_indices(numbers, threshold):
|
||||||
|
# Check if the list has at least three numbers
|
||||||
|
if len(numbers) < 3:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Find the indices of the three largest numbers in the list
|
||||||
|
indices = sorted(range(len(numbers)), key=lambda i: numbers[i], reverse=True)
|
||||||
|
largest_index = indices[0]
|
||||||
|
second_largest_index = indices[1]
|
||||||
|
third_largest_index = indices[2]
|
||||||
|
|
||||||
|
# Check if the largest and second largest numbers are at least threshold larger than the third largest
|
||||||
|
if numbers[largest_index] - numbers[third_largest_index] >= threshold and \
|
||||||
|
numbers[second_largest_index] - numbers[third_largest_index] >= threshold:
|
||||||
|
return largest_index, second_largest_index
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Step 1: Read the WAV file
|
||||||
|
sample_rate, data = wavfile.read(sys.argv[1])
|
||||||
|
|
||||||
|
# Handle stereo audio by converting to mono if needed
|
||||||
|
if len(data.shape) == 2:
|
||||||
|
data = data.mean(axis=1)
|
||||||
|
|
||||||
|
# Define the maximum frequency of interest and Nyquist rate
|
||||||
|
max_freq = 1600.0 # 2000 Hz
|
||||||
|
nyquist_rate = 2 * max_freq # Nyquist rate to prevent aliasing
|
||||||
|
|
||||||
|
# If the sample rate is higher than the Nyquist rate, downsample
|
||||||
|
if sample_rate > nyquist_rate:
|
||||||
|
# Apply a lowpass filter before downsampling
|
||||||
|
cutoff = max_freq #+ 500 # Lowpass filter cutoff slightly above max_freq to prevent aliasing
|
||||||
|
filtered_data = lowpass_filter(data, cutoff, sample_rate)
|
||||||
|
|
||||||
|
# Calculate the downsampling factor
|
||||||
|
downsample_factor = int(sample_rate / nyquist_rate)
|
||||||
|
|
||||||
|
# Downsample the filtered signal
|
||||||
|
filtered_data = decimate(filtered_data, downsample_factor)
|
||||||
|
sample_rate = sample_rate // downsample_factor
|
||||||
|
else:
|
||||||
|
filtered_data = data
|
||||||
|
|
||||||
|
# Step 2: Define the increment (0.1 seconds) and segment size
|
||||||
|
#increment = 0.2 # in seconds
|
||||||
|
#segment_size = int(sample_rate * increment)
|
||||||
|
segment_size = 1024
|
||||||
|
increment = segment_size / sample_rate
|
||||||
|
print(f"Segment size: {segment_size}")
|
||||||
|
|
||||||
|
# Step 3: Process each segment
|
||||||
|
num_segments = len(filtered_data) // segment_size
|
||||||
|
|
||||||
|
# Calculate the frequency resolution
|
||||||
|
delta_f = sample_rate / segment_size
|
||||||
|
|
||||||
|
# Determine the bin range for desired frequency range (100 Hz to 2000 Hz)
|
||||||
|
high_bin = int(max_freq / delta_f)
|
||||||
|
|
||||||
|
# Initialize a 2D array to store DFT results (magnitude spectrum)
|
||||||
|
# Only store the bins within the desired frequency range
|
||||||
|
dft_results = np.zeros((num_segments, high_bin))
|
||||||
|
|
||||||
|
for i in range(num_segments):
|
||||||
|
start = i * segment_size
|
||||||
|
end = start + segment_size
|
||||||
|
segment = filtered_data[start:end]
|
||||||
|
|
||||||
|
# Step 4: Apply the DFT
|
||||||
|
dft_result = fft(segment)
|
||||||
|
|
||||||
|
magnitudes = np.abs(dft_result)
|
||||||
|
total_energy = np.sum(magnitudes ** 2)
|
||||||
|
normalized_magnitudes = magnitudes / np.sqrt(total_energy)
|
||||||
|
normalized_magnitude = np.mean(normalized_magnitudes)
|
||||||
|
|
||||||
|
# Store the magnitude spectrum in the 2D array, only for the desired frequency range
|
||||||
|
dft_results[i, :] = normalized_magnitudes[:high_bin]
|
||||||
|
|
||||||
|
scores = [
|
||||||
|
10 * np.log10(np.sum(normalized_magnitudes[int((f-w)/delta_f):int((f+w)/delta_f)]))
|
||||||
|
for f,w in zip(frequencies, widths)
|
||||||
|
]
|
||||||
|
|
||||||
|
codes = get_largest_two_indices(scores, 3.0)
|
||||||
|
if codes:
|
||||||
|
print([frequencies[code] for code in codes])
|
||||||
|
|
||||||
|
|
||||||
|
# Step 5: Plot the spectrogram
|
||||||
|
plt.figure(figsize=(12, 8))
|
||||||
|
extent = [0, num_segments * increment, 0, max_freq]
|
||||||
|
plt.imshow(dft_results.T, aspect='auto', origin='lower', extent=extent, cmap='viridis')
|
||||||
|
plt.colorbar(label='Magnitude')
|
||||||
|
plt.title('Spectrogram (100 Hz to 2000 Hz)')
|
||||||
|
plt.xlabel('Time (s)')
|
||||||
|
plt.ylabel('Frequency (Hz)')
|
||||||
|
|
||||||
|
|
||||||
|
for freq in frequencies:
|
||||||
|
plt.axhline(y=freq, color='r', linestyle='--', linewidth=0.5)
|
||||||
|
plt.show()
|
||||||
Loading…
Reference in New Issue
Block a user