Compare commits
5 Commits
8aecf684f9
...
e4b0645ac6
| Author | SHA1 | Date | |
|---|---|---|---|
| e4b0645ac6 | |||
| 700f34e729 | |||
| eb2926f525 | |||
| e2be6823bd | |||
| 77441d8b7c |
3
scripts/.gitignore
vendored
3
scripts/.gitignore
vendored
@ -1 +1,2 @@
|
||||
*.wav
|
||||
*.wav
|
||||
__pycache__/
|
||||
42
scripts/filters.py
Normal file
42
scripts/filters.py
Normal file
@ -0,0 +1,42 @@
|
||||
|
||||
from scipy.signal import butter, lfilter, decimate
|
||||
|
||||
def anti_alias(data, sample_rate, max_frequency):
|
||||
nyquist_rate = 2 * max_frequency
|
||||
|
||||
if sample_rate > nyquist_rate:
|
||||
filtered_data = lowpass_filter(data, max_frequency, sample_rate)
|
||||
|
||||
downsample_factor = int(sample_rate / nyquist_rate)
|
||||
|
||||
filtered_data = decimate(filtered_data, downsample_factor)
|
||||
sample_rate = sample_rate // downsample_factor
|
||||
else:
|
||||
filtered_data = data
|
||||
|
||||
return filtered_data, sample_rate
|
||||
|
||||
# These originally came from https://scipy.github.io/old-wiki/pages/Cookbook/ButterworthBandpass,
|
||||
# but they've been copied around the internet so many times that ChatGPT now produces them verbatim.
|
||||
def butter_bandpass(lowcut, highcut, fs, order=5):
|
||||
nyquist = 0.5 * fs
|
||||
low = lowcut / nyquist
|
||||
high = highcut / nyquist
|
||||
b, a = butter(order, [low, high], btype='band')
|
||||
return b, a
|
||||
|
||||
def bandpass_filter(data, lowcut, highcut, fs, order=5):
|
||||
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
|
||||
y = lfilter(b, a, data)
|
||||
return y
|
||||
|
||||
def butter_lowpass(cutoff, fs, order=5):
|
||||
nyquist = 0.5 * fs
|
||||
normal_cutoff = cutoff / nyquist
|
||||
b, a = butter(order, normal_cutoff, btype='low', analog=False)
|
||||
return b, a
|
||||
|
||||
def lowpass_filter(data, cutoff, fs, order=5):
|
||||
b, a = butter_lowpass(cutoff, fs, order=order)
|
||||
y = lfilter(b, a, data)
|
||||
return y
|
||||
@ -1,147 +1,125 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
|
||||
import numpy as np
|
||||
from scipy.io import wavfile
|
||||
from scipy.fft import fft
|
||||
from scipy.signal import butter, lfilter, decimate
|
||||
from filters import anti_alias
|
||||
from tones import TONES
|
||||
import matplotlib.pyplot as plt
|
||||
import sys
|
||||
|
||||
frequencies = np.array([10 ** ((n + 22) * 0.0225 + 2) for n in range(32)])
|
||||
widths = np.array([6.25 * 10 ** (n * 0.0225) for n in range(32)])
|
||||
widths = np.array([6.25 for n in range(32)])
|
||||
file_name = sys.argv[1]
|
||||
|
||||
def butter_bandpass(lowcut, highcut, fs, order=5):
|
||||
nyquist = 0.5 * fs
|
||||
low = lowcut / nyquist
|
||||
high = highcut / nyquist
|
||||
b, a = butter(order, [low, high], btype='band')
|
||||
return b, a
|
||||
|
||||
def bandpass_filter(data, lowcut, highcut, fs, order=5):
|
||||
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
|
||||
y = lfilter(b, a, data)
|
||||
return y
|
||||
def decibels(f):
|
||||
return 10 * np.log10(f)
|
||||
|
||||
def butter_lowpass(cutoff, fs, order=5):
|
||||
nyquist = 0.5 * fs
|
||||
normal_cutoff = cutoff / nyquist
|
||||
b, a = butter(order, normal_cutoff, btype='low', analog=False)
|
||||
return b, a
|
||||
def find_top_two_keys(d, threshold):
|
||||
sorted_items = sorted(d.items(), key=lambda item: item[1], reverse=True)
|
||||
|
||||
def lowpass_filter(data, cutoff, fs, order=5):
|
||||
b, a = butter_lowpass(cutoff, fs, order=order)
|
||||
y = lfilter(b, a, data)
|
||||
return y
|
||||
|
||||
def get_largest_two_indices(numbers, threshold):
|
||||
# Check if the list has at least three numbers
|
||||
if len(numbers) < 3:
|
||||
if len(sorted_items) < 3:
|
||||
return None
|
||||
|
||||
# Find the indices of the three largest numbers in the list
|
||||
indices = sorted(range(len(numbers)), key=lambda i: numbers[i], reverse=True)
|
||||
largest_index = indices[0]
|
||||
second_largest_index = indices[1]
|
||||
third_largest_index = indices[2]
|
||||
|
||||
# Check if the largest and second largest numbers are at least threshold larger than the third largest
|
||||
if numbers[largest_index] - numbers[third_largest_index] >= threshold and \
|
||||
numbers[second_largest_index] - numbers[third_largest_index] >= threshold:
|
||||
return largest_index, second_largest_index
|
||||
else:
|
||||
top_two = sorted_items[:2]
|
||||
third_value = sorted_items[2][1]
|
||||
if top_two[0][1] - third_value < threshold or top_two[1][1] - third_value < threshold:
|
||||
return None
|
||||
|
||||
return top_two[0][0], top_two[1][0]
|
||||
|
||||
# Step 1: Read the WAV file
|
||||
sample_rate, data = wavfile.read(sys.argv[1])
|
||||
sample_rate, data = wavfile.read(file_name)
|
||||
|
||||
# Handle stereo audio by converting to mono if needed
|
||||
if len(data.shape) == 2:
|
||||
data = data.mean(axis=1)
|
||||
|
||||
# Define the maximum frequency of interest and Nyquist rate
|
||||
max_freq = 1600.0 # 2000 Hz
|
||||
nyquist_rate = 2 * max_freq # Nyquist rate to prevent aliasing
|
||||
max_freq = 1600.0
|
||||
data, sample_rate = anti_alias(data, sample_rate, max_freq)
|
||||
|
||||
# If the sample rate is higher than the Nyquist rate, downsample
|
||||
if sample_rate > nyquist_rate:
|
||||
# Apply a lowpass filter before downsampling
|
||||
cutoff = max_freq #+ 500 # Lowpass filter cutoff slightly above max_freq to prevent aliasing
|
||||
filtered_data = lowpass_filter(data, cutoff, sample_rate)
|
||||
fft_size = 1024 # Must be larger than max_freq TODO JMT: fix this, zero-pad
|
||||
frequency_resolution = sample_rate / fft_size
|
||||
max_bin = int(max_freq / frequency_resolution)
|
||||
|
||||
# Calculate the downsampling factor
|
||||
downsample_factor = int(sample_rate / nyquist_rate)
|
||||
segment_interval = 0.2 # seconds
|
||||
samples_per_interval = int(sample_rate * segment_interval)
|
||||
num_segments = len(data) // samples_per_interval
|
||||
|
||||
# Downsample the filtered signal
|
||||
filtered_data = decimate(filtered_data, downsample_factor)
|
||||
sample_rate = sample_rate // downsample_factor
|
||||
else:
|
||||
filtered_data = data
|
||||
|
||||
# Step 2: Define the increment (0.1 seconds) and segment size
|
||||
#increment = 0.2 # in seconds
|
||||
#segment_size = int(sample_rate * increment)
|
||||
segment_size = 1024
|
||||
increment = segment_size / sample_rate
|
||||
print(f"Segment size: {segment_size}")
|
||||
fft_results = np.zeros((num_segments, max_bin))
|
||||
|
||||
# Step 3: Process each segment
|
||||
num_segments = len(filtered_data) // segment_size
|
||||
|
||||
# Calculate the frequency resolution
|
||||
delta_f = sample_rate / segment_size
|
||||
|
||||
# Determine the bin range for desired frequency range (100 Hz to 2000 Hz)
|
||||
high_bin = int(max_freq / delta_f)
|
||||
|
||||
seg_off = int(sample_rate * 0.1)
|
||||
act_segs = len(filtered_data) // seg_off
|
||||
|
||||
# Initialize a 2D array to store DFT results (magnitude spectrum)
|
||||
# Only store the bins within the desired frequency range
|
||||
dft_results = np.zeros((act_segs, high_bin))
|
||||
|
||||
for i in range(act_segs):
|
||||
end = (i+1) * seg_off
|
||||
start = end - segment_size
|
||||
for i in range(num_segments):
|
||||
# Segment window is current position to fft_size samples in the past. As such some segments
|
||||
# will have overlap in which samples are used when fft_size > samples_per_interval
|
||||
end = (i + 1) * samples_per_interval
|
||||
start = end - fft_size
|
||||
|
||||
try:
|
||||
segment = filtered_data[start:end]
|
||||
segment = data[start:end]
|
||||
|
||||
# Step 4: Apply the DFT
|
||||
dft_result = fft(segment)
|
||||
fft_result = fft(segment)
|
||||
|
||||
magnitudes = np.abs(dft_result)
|
||||
magnitudes = np.abs(fft_result)
|
||||
total_energy = np.sum(magnitudes ** 2)
|
||||
normalized_magnitudes = magnitudes / np.sqrt(total_energy)
|
||||
normalized_magnitude = np.mean(normalized_magnitudes)
|
||||
|
||||
# Store the magnitude spectrum in the 2D array, only for the desired frequency range
|
||||
dft_results[i, :] = normalized_magnitudes[:high_bin]
|
||||
# Store the normalised magnitude spectrum only for the desired frequency range
|
||||
fft_results[i, :] = normalized_magnitudes[:max_bin]
|
||||
|
||||
scores = [
|
||||
10 * np.log10(np.sum(normalized_magnitudes[int((f-w)/delta_f):int((f+w)/delta_f)]))
|
||||
for f,w in zip(frequencies, widths)
|
||||
]
|
||||
tone_width = 6.25 # Hz
|
||||
def band(centre_frequency, width=tone_width):
|
||||
return (centre_frequency - width, centre_frequency + width)
|
||||
|
||||
codes = get_largest_two_indices(scores, 3.0)
|
||||
if codes:
|
||||
print([frequencies[code] for code in sorted(codes)])
|
||||
except:
|
||||
pass
|
||||
def bins(band):
|
||||
return (int(band[0]/frequency_resolution), int(band[1]/frequency_resolution))
|
||||
|
||||
def magnitude_in_band(band):
|
||||
low_bin, high_bin = bins(band)
|
||||
return np.sum(normalized_magnitudes[low_bin:high_bin])
|
||||
|
||||
scores = {
|
||||
tone:decibels(magnitude_in_band(band(frequency)))
|
||||
for tone,frequency in TONES.items()
|
||||
}
|
||||
|
||||
active_tones = find_top_two_keys(scores, 3.0)
|
||||
if active_tones:
|
||||
print(active_tones)
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
# Step 5: Plot the spectrogram
|
||||
plt.figure(figsize=(12, 8))
|
||||
extent = [0, num_segments * increment, 0, max_freq]
|
||||
plt.imshow(dft_results.T, aspect='auto', origin='lower', extent=extent, cmap='viridis')
|
||||
plt.colorbar(label='Magnitude')
|
||||
plt.title('Spectrogram (100 Hz to 2000 Hz)')
|
||||
plt.xlabel('Time (s)')
|
||||
plt.ylabel('Frequency (Hz)')
|
||||
# Only import if we're actually plotting, these imports are pretty heavy.
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph.Qt import QtGui, QtWidgets, QtCore, mkQApp
|
||||
|
||||
app = pg.mkQApp("ImageView Example")
|
||||
window = QtWidgets.QMainWindow()
|
||||
window.resize(1280, 720)
|
||||
window.setWindowTitle(f"SELCAL FFT Analysis: {file_name}")
|
||||
|
||||
for freq in frequencies:
|
||||
plt.axhline(y=freq, color='r', linestyle='--', linewidth=0.5)
|
||||
plt.show()
|
||||
layout = pg.GraphicsLayoutWidget(show=True)
|
||||
window.setCentralWidget(layout)
|
||||
plot = layout.addPlot()
|
||||
|
||||
fft_view = pg.ImageItem()
|
||||
fft_view.setImage(fft_results)
|
||||
fft_view.setRect(QtCore.QRectF(0, 0, len(data) // sample_rate, max_freq))
|
||||
plot.addItem(fft_view)
|
||||
|
||||
# Note JMT: Requires matplotlib installed to use this colormap
|
||||
colormap = pg.colormap.get("CMRmap", source='matplotlib')
|
||||
colorbar = pg.ColorBarItem( values=(0,1), colorMap=colormap)
|
||||
colorbar.setImageItem(fft_view, insert_in=plot)
|
||||
|
||||
tone_pen = pg.mkPen(color=(20, 20, 20), width=1, style=QtCore.Qt.DashLine)
|
||||
for frequency in TONES.values():
|
||||
tone_line = pg.InfiniteLine(pos=frequency, angle=0, pen=tone_pen)
|
||||
plot.addItem(tone_line)
|
||||
|
||||
yticks = [(frequency, tone) for tone,frequency in TONES.items()]
|
||||
plot.getAxis('left').setTicks([yticks])
|
||||
|
||||
window.show()
|
||||
pg.exec()
|
||||
|
||||
27
scripts/tones.py
Executable file
27
scripts/tones.py
Executable file
@ -0,0 +1,27 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
'''
|
||||
def frequency_16(n):
|
||||
return 100 * 10 ** ((n + 22) * 0.0225)
|
||||
'''
|
||||
|
||||
def frequency_32(n):
|
||||
return 100 * 10 ** ((n + 22) * 0.0225)
|
||||
|
||||
def width_16(n):
|
||||
return 12.5 * 10 ** (n * 0.045)
|
||||
|
||||
def width_32(n):
|
||||
return 6.25 * 10 ** (n * 0.0225)
|
||||
|
||||
|
||||
SELCAL_16 = 'ABCDEFGHJKLMPQRS'
|
||||
SELCAL_EXTRA = 'TUVWXYZ123456789'
|
||||
SELCAL_32 = [item for pair in zip(SELCAL_16, SELCAL_EXTRA) for item in pair]
|
||||
|
||||
TONES = {SELCAL_32[n]:frequency_32(n) for n in range(32)}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Print SELCAL 32 tones in csv format, ascending frequency order
|
||||
print('\n'.join([f"{k},{v}" for k,v in TONES.items()]))
|
||||
Loading…
Reference in New Issue
Block a user