Compare commits

..

7 Commits

8 changed files with 164 additions and 117 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
build/ build/
.*/

1
scripts/.gitignore vendored
View File

@ -1 +1,2 @@
*.wav *.wav
__pycache__/

42
scripts/filters.py Normal file
View File

@ -0,0 +1,42 @@
from scipy.signal import butter, lfilter, decimate
def anti_alias(data, sample_rate, max_frequency):
nyquist_rate = 2 * max_frequency
if sample_rate > nyquist_rate:
filtered_data = lowpass_filter(data, max_frequency, sample_rate)
downsample_factor = int(sample_rate / nyquist_rate)
filtered_data = decimate(filtered_data, downsample_factor)
sample_rate = sample_rate // downsample_factor
else:
filtered_data = data
return filtered_data, sample_rate
# These originally came from https://scipy.github.io/old-wiki/pages/Cookbook/ButterworthBandpass,
# but they've been copied around the internet so many times that ChatGPT now produces them verbatim.
def butter_bandpass(lowcut, highcut, fs, order=5):
nyquist = 0.5 * fs
low = lowcut / nyquist
high = highcut / nyquist
b, a = butter(order, [low, high], btype='band')
return b, a
def bandpass_filter(data, lowcut, highcut, fs, order=5):
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = lfilter(b, a, data)
return y
def butter_lowpass(cutoff, fs, order=5):
nyquist = 0.5 * fs
normal_cutoff = cutoff / nyquist
b, a = butter(order, normal_cutoff, btype='low', analog=False)
return b, a
def lowpass_filter(data, cutoff, fs, order=5):
b, a = butter_lowpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return y

View File

@ -1,147 +1,106 @@
#! /usr/bin/env python3 #! /usr/bin/env python3
import numpy as np import numpy as np
from scipy.io import wavfile from scipy.io import wavfile
from scipy.fft import fft from scipy.fft import fft
from scipy.signal import butter, lfilter, decimate from filters import anti_alias
from tones import TONES
from utilities import *
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import sys import sys
frequencies = np.array([10 ** ((n + 22) * 0.0225 + 2) for n in range(32)]) file_name = sys.argv[1]
widths = np.array([6.25 * 10 ** (n * 0.0225) for n in range(32)]) sample_rate, data = wavfile.read(file_name)
widths = np.array([6.25 for n in range(32)])
def butter_bandpass(lowcut, highcut, fs, order=5):
nyquist = 0.5 * fs
low = lowcut / nyquist
high = highcut / nyquist
b, a = butter(order, [low, high], btype='band')
return b, a
def bandpass_filter(data, lowcut, highcut, fs, order=5):
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = lfilter(b, a, data)
return y
def butter_lowpass(cutoff, fs, order=5):
nyquist = 0.5 * fs
normal_cutoff = cutoff / nyquist
b, a = butter(order, normal_cutoff, btype='low', analog=False)
return b, a
def lowpass_filter(data, cutoff, fs, order=5):
b, a = butter_lowpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return y
def get_largest_two_indices(numbers, threshold):
# Check if the list has at least three numbers
if len(numbers) < 3:
return None
# Find the indices of the three largest numbers in the list
indices = sorted(range(len(numbers)), key=lambda i: numbers[i], reverse=True)
largest_index = indices[0]
second_largest_index = indices[1]
third_largest_index = indices[2]
# Check if the largest and second largest numbers are at least threshold larger than the third largest
if numbers[largest_index] - numbers[third_largest_index] >= threshold and \
numbers[second_largest_index] - numbers[third_largest_index] >= threshold:
return largest_index, second_largest_index
else:
return None
# Step 1: Read the WAV file
sample_rate, data = wavfile.read(sys.argv[1])
# Handle stereo audio by converting to mono if needed # Handle stereo audio by converting to mono if needed
if len(data.shape) == 2: if len(data.shape) == 2:
data = data.mean(axis=1) data = data.mean(axis=1)
# Define the maximum frequency of interest and Nyquist rate max_freq = 1600.0
max_freq = 1600.0 # 2000 Hz data, sample_rate = anti_alias(data, sample_rate, max_freq)
nyquist_rate = 2 * max_freq # Nyquist rate to prevent aliasing
# If the sample rate is higher than the Nyquist rate, downsample fft_size = 1024 # Must be larger than max_freq TODO JMT: fix this, zero-pad
if sample_rate > nyquist_rate: frequency_resolution = sample_rate / fft_size
# Apply a lowpass filter before downsampling max_bin = int(max_freq / frequency_resolution)
cutoff = max_freq #+ 500 # Lowpass filter cutoff slightly above max_freq to prevent aliasing
filtered_data = lowpass_filter(data, cutoff, sample_rate)
# Calculate the downsampling factor segment_interval = 0.2 # seconds
downsample_factor = int(sample_rate / nyquist_rate) samples_per_interval = int(sample_rate * segment_interval)
num_segments = len(data) // samples_per_interval
# Downsample the filtered signal
filtered_data = decimate(filtered_data, downsample_factor)
sample_rate = sample_rate // downsample_factor
else:
filtered_data = data
# Step 2: Define the increment (0.1 seconds) and segment size fft_results = np.zeros((num_segments, max_bin))
#increment = 0.2 # in seconds
#segment_size = int(sample_rate * increment)
segment_size = 1024
increment = segment_size / sample_rate
print(f"Segment size: {segment_size}")
# Step 3: Process each segment for i in range(num_segments):
num_segments = len(filtered_data) // segment_size # Segment window is current position to fft_size samples in the past. As such some segments
# will have overlap in which samples are used when fft_size > samples_per_interval
# Calculate the frequency resolution end = (i + 1) * samples_per_interval
delta_f = sample_rate / segment_size start = end - fft_size
# Determine the bin range for desired frequency range (100 Hz to 2000 Hz)
high_bin = int(max_freq / delta_f)
seg_off = int(sample_rate * 0.1)
act_segs = len(filtered_data) // seg_off
# Initialize a 2D array to store DFT results (magnitude spectrum)
# Only store the bins within the desired frequency range
dft_results = np.zeros((act_segs, high_bin))
for i in range(act_segs):
end = (i+1) * seg_off
start = end - segment_size
try: try:
segment = filtered_data[start:end] segment = data[start:end]
# Step 4: Apply the DFT fft_result = fft(segment)
dft_result = fft(segment)
magnitudes = np.abs(dft_result) magnitudes = np.abs(fft_result)
total_energy = np.sum(magnitudes ** 2) total_energy = np.sum(magnitudes ** 2)
normalized_magnitudes = magnitudes / np.sqrt(total_energy) normalized_magnitudes = magnitudes / np.sqrt(total_energy)
normalized_magnitude = np.mean(normalized_magnitudes)
# Store the magnitude spectrum in the 2D array, only for the desired frequency range # Store the normalised magnitude spectrum only for the desired frequency range
dft_results[i, :] = normalized_magnitudes[:high_bin] fft_results[i, :] = normalized_magnitudes[:max_bin]
scores = [ tone_width = 6.25 # Hz
10 * np.log10(np.sum(normalized_magnitudes[int((f-w)/delta_f):int((f+w)/delta_f)])) def band(centre_frequency, width=tone_width):
for f,w in zip(frequencies, widths) return (centre_frequency - width, centre_frequency + width)
]
codes = get_largest_two_indices(scores, 3.0) def bins(band):
if codes: return (int(band[0]/frequency_resolution), int(band[1]/frequency_resolution))
print([frequencies[code] for code in sorted(codes)])
except: def magnitude_in_band(band):
pass low_bin, high_bin = bins(band)
return np.sum(normalized_magnitudes[low_bin:high_bin])
scores = {
tone:decibels(magnitude_in_band(band(frequency)))
for tone,frequency in TONES.items()
}
active_tones = find_top_two_keys(scores, 3.0)
if active_tones:
print(active_tones)
except Exception as e:
print(e)
# Step 5: Plot the spectrogram # Only import if we're actually plotting, these imports are pretty heavy.
plt.figure(figsize=(12, 8)) import pyqtgraph as pg
extent = [0, num_segments * increment, 0, max_freq] from pyqtgraph.Qt import QtWidgets, QtCore
plt.imshow(dft_results.T, aspect='auto', origin='lower', extent=extent, cmap='viridis')
plt.colorbar(label='Magnitude')
plt.title('Spectrogram (100 Hz to 2000 Hz)')
plt.xlabel('Time (s)')
plt.ylabel('Frequency (Hz)')
app = pg.mkQApp("ImageView Example")
window = QtWidgets.QMainWindow()
window.resize(1280, 720)
window.setWindowTitle(f"SELCAL FFT Analysis: {file_name}")
for freq in frequencies: layout = pg.GraphicsLayoutWidget(show=True)
plt.axhline(y=freq, color='r', linestyle='--', linewidth=0.5) window.setCentralWidget(layout)
plt.show() plot = layout.addPlot()
fft_view = pg.ImageItem()
fft_view.setImage(fft_results)
fft_view.setRect(QtCore.QRectF(0, 0, len(data) // sample_rate, max_freq))
plot.addItem(fft_view)
colormap = pg.colormap.get("inferno")
colorbar = pg.ColorBarItem( values=(0,1), colorMap=colormap)
colorbar.setImageItem(fft_view, insert_in=plot)
tone_pen = pg.mkPen(color=(20, 20, 20), width=1, style=QtCore.Qt.DashLine)
for frequency in TONES.values():
tone_line = pg.InfiniteLine(pos=frequency, angle=0, pen=tone_pen)
plot.addItem(tone_line)
yticks = [(frequency, tone) for tone,frequency in TONES.items()]
plot.getAxis('left').setTicks([yticks])
window.show()
pg.exec()

27
scripts/tones.py Executable file
View File

@ -0,0 +1,27 @@
#! /usr/bin/env python3
'''
def frequency_16(n):
return 100 * 10 ** ((n + 22) * 0.0225)
'''
def frequency_32(n):
return 100 * 10 ** ((n + 22) * 0.0225)
def width_16(n):
return 12.5 * 10 ** (n * 0.045)
def width_32(n):
return 6.25 * 10 ** (n * 0.0225)
SELCAL_16 = 'ABCDEFGHJKLMPQRS'
SELCAL_EXTRA = 'TUVWXYZ123456789'
SELCAL_32 = [item for pair in zip(SELCAL_16, SELCAL_EXTRA) for item in pair]
TONES = {SELCAL_32[n]:frequency_32(n) for n in range(32)}
if __name__ == "__main__":
# Print SELCAL 32 tones in csv format, ascending frequency order
print('\n'.join([f"{k},{v}" for k,v in TONES.items()]))

17
scripts/utilities.py Normal file
View File

@ -0,0 +1,17 @@
import numpy as np
def decibels(f):
return 10 * np.log10(f)
def find_top_two_keys(d, threshold):
sorted_items = sorted(d.items(), key=lambda item: item[1], reverse=True)
if len(sorted_items) < 3:
return None
top_two = sorted_items[:2]
third_value = sorted_items[2][1]
if top_two[0][1] - third_value < threshold or top_two[1][1] - third_value < threshold:
return None
return top_two[0][0], top_two[1][0]

Binary file not shown.