Compare commits

..

4 Commits

7 changed files with 196 additions and 72 deletions

1
scripts/.gitignore vendored
View File

@ -1,2 +1,3 @@
*.wav *.wav
__pycache__/ __pycache__/
junk/

53
scripts/README.md Normal file
View File

@ -0,0 +1,53 @@
# Planned algorithm
From selcald:
One possible solution to this problem is to step back from trying to determine the frequencies of the individual tones and to instead verify that:
- There are two tones present
- The difference in frequency between the two tones matches a pair of known tones in the alphabet
The author of selcald seems awfully hung up on trying to ensure correct detection of the silent period
between two tones. I don't think this is worth actively trying to detect, as it is more important
to find two dominant tones for a period of ~1 sec, followed by another 1 sec with two dominant tones.
The silence period is irrelevant - according to the spec of 0.2 +/- 0.1 sec _at the transmitter_ it
could be impossibly short to reliably distinguish between successive `A*-A*` tones at the receiver.
## Terrible pseudocode
```
collect samples from audio source <wavfile, arecord>
low pass & decimate if necessary
<simultaneously>
run wide fft over a large sample window
detect peak bin (with some hysteresis)
if peak bin within tone band:
assume difference is due to doppler shift
gradually adjust expected freq(s) for tone(s) to match
record RMS energy into buffer
<simultaneously>
run correlation over small sample window
For each tone in the alphabet:
Cross correlate the audio with the adjusted tone
Record normalized tone energy into tone buffer
For tone record in tone buffer:
sort tones by amplitude
if:
- the two highest amplitude tones are within 3 dB of each other
- the two highest amplitude tones are at least 3 dB greater than the next nearest tone
- the sum of the two highest tones account for at least 60% of the modulation energy (-2.22 dB)
(as determined relative to stable RMS energy calc'd by wide fft)
(nominally 90% / -0.458 dB)
then:
record dominant two tones in tone buffer
if any two tones are dominant tones for the majority of previous 1 sec:
record dominant two tones in code buffer
reset tone detection counter (prohibit detecting codes for another 0.5 sec)
upon code record in code buffer:
if dominant code previously occured within 1.25 sec
emit SELCAL detection
reset code detection counter (prohibit detecting codes for another 0.5 sec)
```

8
scripts/audio-capture.sh Executable file
View File

@ -0,0 +1,8 @@
#! /bin/sh
set -eux
sample_rate="44100"
channels="2"
arecord -t raw -c ${channels} -f S16_LE -r ${sample_rate}

9
scripts/audio-pipe.sh Executable file
View File

@ -0,0 +1,9 @@
#! /bin/sh
set -eux
audio_file=$1
sample_rate="44100"
channels="2"
ffmpeg -i ${audio_file} -f s16le -ac ${channels} -ar ${sample_rate} -

View File

@ -1,20 +1,35 @@
import numpy as np
from scipy.signal import butter, lfilter, decimate from scipy.signal import butter, lfilter, decimate
def anti_alias(data, sample_rate, max_frequency): def anti_alias(data, sample_rate, max_frequency):
FILTER_HEADROOM = 1.2
nyquist_rate = 2 * max_frequency nyquist_rate = 2 * max_frequency
downsample_factor = 1
if sample_rate > nyquist_rate: if sample_rate > nyquist_rate:
filtered_data = lowpass_filter(data, max_frequency, sample_rate) filtered_data = lowpass_filter(data, max_frequency * FILTER_HEADROOM, sample_rate)
downsample_factor = int(sample_rate / nyquist_rate) downsample_factor = int(sample_rate / nyquist_rate)
filtered_data = decimate(filtered_data, downsample_factor) filtered_data = decimate(filtered_data, downsample_factor)
sample_rate = sample_rate // downsample_factor sample_rate = sample_rate // downsample_factor
else: else:
filtered_data = data filtered_data = data
return filtered_data, sample_rate return filtered_data, sample_rate, downsample_factor
def smoothing_filter(data, window_size=256):
window = np.ones(window_size) / window_size
return np.convolve(data, window, mode='same')
# Stolen from selcald
def note(freq, length, amp=1.0, rate=44100):
if freq == 0:
data = np.zeros(int(length * rate))
else:
t = np.linspace(0, length, int(length * rate))
data = np.sin(2 * np.pi * freq * t) * amp
return data
# These originally came from https://scipy.github.io/old-wiki/pages/Cookbook/ButterworthBandpass, # These originally came from https://scipy.github.io/old-wiki/pages/Cookbook/ButterworthBandpass,
# but they've been copied around the internet so many times that ChatGPT now produces them verbatim. # but they've been copied around the internet so many times that ChatGPT now produces them verbatim.

93
scripts/live.py Executable file
View File

@ -0,0 +1,93 @@
#! /usr/bin/env python3
import sys
import threading
import numpy as np
import signal
import pyqtgraph as pg
data1 = np.random.normal(size=300)
ptr1 = 0
win = pg.GraphicsLayoutWidget(show=True)
win.setWindowTitle('pyqtgraph example: Scrolling Plots')
plot = win.addPlot()
curve = plot.plot(data1)
keep_running = True
def signal_handler(sig, frame):
global keep_running
print('SIGINT received. Stopping...')
keep_running = False
def read_audio_from_stdin(chunk_size, process_chunk):
global keep_running
while keep_running:
# 2 bytes per sample for int16
read_size = chunk_size * 2
data = sys.stdin.buffer.read(read_size)
# Break the loop if no more data is available
if not data:
break
# Convert the binary data to a numpy array of int16
audio_chunk = np.frombuffer(data, dtype=np.int16)
process_chunk(audio_chunk)
def process_audio_chunk(audio_chunk):
# Example processing: simply print the chunk
global data1, ptr1, curve
print(f"Read chunk: {len(audio_chunk)}")
data1[:-1] = data1[1:] # shift data in the array one sample left
# (see also: np.roll)
data1[-1] = len(audio_chunk)
ptr1 += 1
curve.setData(data1)
curve.setPos(ptr1, 0)
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
chunk_duration = 0.1 # seconds
sample_rate = 44100
channels = 2
chunk_size = int(sample_rate * chunk_duration) * channels
reader_thread = threading.Thread(target=read_audio_from_stdin, args=(chunk_size, process_audio_chunk))
reader_thread.daemon = True
reader_thread.start()
pg.exec()
# Wait...
reader_thread.join()
'''
# 1) Simplest approach -- update data in the array such that plot appears to scroll
# In these examples, the array size is fixed.
p1 = win.addPlot()
p2 = win.addPlot()
data1 = np.random.normal(size=300)
curve1 = p1.plot(data1)
curve2 = p2.plot(data1)
ptr1 = 0
def update1():
global data1, ptr1
data1[:-1] = data1[1:] # shift data in the array one sample left
# (see also: np.roll)
data1[-1] = np.random.normal()
curve1.setData(data1)
ptr1 += 1
curve2.setData(data1)
curve2.setPos(ptr1, 0)
'''

View File

@ -5,85 +5,30 @@ import sys
import numpy as np import numpy as np
from scipy import signal from scipy import signal
from scipy.io import wavfile from scipy.io import wavfile
from scipy.signal import butter, lfilter from tones import TONES
from filters import bandpass_filter, note, smoothing_filter, anti_alias
tones = {}
with open('tones.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
tones[row['designator']] = float(row['frequency'])
'''
def freq_of_key(midi_key):
return 440.0 * (2 ** ((midi_key - 69)/12))
tones = {}
for c in range(65, 90):
tones[c] = freq_of_key(c)
'''
# Shamelessly lifted from
# https://scipy.github.io/old-wiki/pages/Cookbook/ButterworthBandpass
def butter_bandpass(lowcut, highcut, fs, order=5):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='band')
return b, a
def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
y = lfilter(b, a, data)
return y
# tone synthesis
def note(freq, cycles, amp=32767.0, rate=44100):
len = cycles * (1.0/rate)
t = np.linspace(0, len, int(len * rate))
if freq == 0:
data = np.zeros(int(len * rate))
else:
data = np.sin(2 * np.pi * freq * t) * amp
return data.astype(int)
def decimate_from_sample_rate(sample_rate):
if sample_rate == 44100:
return 4 # rate = 11025, Fmax = 5512.5 Hz
elif sample_rate == 48000:
return 5 # rate = 9600, Fmax = 4800 Hz
elif sample_rate == 22050:
return 2 # rate = 11025, Fmax = 5512.5 Hz
elif sample_rate == 11025:
return 1 # rate = 11025, Fmax = 5512.5 Hz
else:
raise ValueError("Sample rate not supported")
pure_sample_length = 0.1
if __name__ == '__main__': if __name__ == '__main__':
# TODO JMT: What is this?
FLT_LEN = 2000
file_name = sys.argv[1] file_name = sys.argv[1]
sample_rate, data = wavfile.read(file_name) sample_rate, data = wavfile.read(file_name)
print(f"{file_name}: {len(data)} samples @ {sample_rate} Hz") print(f"{file_name}: {len(data)} samples @ {sample_rate} Hz")
decimate = decimate_from_sample_rate(sample_rate) if len(data.shape) == 2:
if decimate > 1: data = data.mean(axis=1)
data = signal.decimate(data, decimate)
sample_rate = sample_rate / decimate
print(f'Length after decimation: {len(data)} samples') # Coarse normalisation
data = data / np.max(data)
data = butter_bandpass_filter(data, 270, 1700, sample_rate, order=8) # TODO JMT: Find out why the correlation step fails when max frequency <= 2 * nyquist rate
data, sample_rate, decimation = anti_alias(data, sample_rate, 4800)
print(f'Length after decimation: {len(data)} samples (/{decimation}, {sample_rate} Hz)')
pure_signals = {tone:note(freq, FLT_LEN, rate=sample_rate) for tone,freq in tones.items()} pure_signals = {tone:note(freq, pure_sample_length, rate=sample_rate) for tone,freq in TONES.items()}
correlations = {tone:np.abs(signal.correlate(data, pure, mode='same')) for tone,pure in pure_signals.items()} correlations = {tone:np.abs(signal.correlate(data, pure, mode='same')) for tone,pure in pure_signals.items()}
massaged = {tone:smoothing_filter(correlation) for tone,correlation in correlations.items()}
N = FLT_LEN // 8 # Rolling average length
cumsum_convolution = np.ones(N)/N
massaged = {tone:np.convolve(correlation, cumsum_convolution, mode='valid') for tone,correlation in correlations.items()}
# Only import if we're actually plotting, these imports are pretty heavy. # Only import if we're actually plotting, these imports are pretty heavy.
import pyqtgraph as pg import pyqtgraph as pg
@ -100,7 +45,7 @@ if __name__ == '__main__':
legend.setParentItem(legend_view) legend.setParentItem(legend_view)
color_map = pg.colormap.get('CET-C6s') color_map = pg.colormap.get('CET-C6s')
colors = color_map.getLookupTable(nPts=len(tones)) colors = color_map.getLookupTable(nPts=len(TONES))
for (tone, correlation), color in zip(massaged.items(), colors): for (tone, correlation), color in zip(massaged.items(), colors):
line = plot.plot(correlation, pen=pg.mkPen(color=color), fillLevel=0.1, name=tone) line = plot.plot(correlation, pen=pg.mkPen(color=color), fillLevel=0.1, name=tone)