154 lines
4.5 KiB
Python
Executable File
154 lines
4.5 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
import sys
|
|
import threading
|
|
import numpy as np
|
|
import signal as sig
|
|
from tones import TONES
|
|
from filters import anti_alias, bandpass_filter, note, smoothing_filter
|
|
from scipy import signal
|
|
from scipy.signal import butter, lfilter, decimate
|
|
|
|
import pyqtgraph as pg
|
|
from pyqtgraph.Qt import QtGui, QtWidgets, mkQApp
|
|
import time
|
|
|
|
keep_running = True
|
|
def signal_handler(sig, frame):
|
|
global keep_running
|
|
print('SIGINT received. Stopping...')
|
|
keep_running = False
|
|
|
|
|
|
class DetectorGui(QtWidgets.QMainWindow):
|
|
def __init__(self, *args, **kwargs):
|
|
super(DetectorGui, self).__init__(*args, **kwargs)
|
|
layout = pg.GraphicsLayoutWidget(show=True)
|
|
self.setCentralWidget(layout)
|
|
self.setWindowTitle('SELCAL Detector')
|
|
self.resize(1280, 800)
|
|
|
|
self.plot = layout.addPlot()
|
|
legend_view = layout.addViewBox()
|
|
|
|
legend = pg.LegendItem(offset=(0, 0))
|
|
legend.setParentItem(legend_view)
|
|
|
|
color_map = pg.colormap.get('CET-C6s')
|
|
colors = color_map.getLookupTable(nPts=len(TONES))
|
|
|
|
t = np.linspace(0, 500, 100)
|
|
|
|
self.tone_data = {}
|
|
self.tone_lines = {}
|
|
self.tone_pens = {}
|
|
|
|
for tone,color in zip(TONES.keys(), colors):
|
|
self.tone_data[tone] = np.zeros(1000, dtype=np.float64) #np.array([], dtype=np.float64) #np.zeros(int(10000), dtype=np.float64)
|
|
self.tone_lines[tone] = self.plot.plot(self.tone_data[tone], pen=pg.mkPen(color=color), name=tone)
|
|
legend.addItem(self.tone_lines[tone], tone)
|
|
self.tone_pens[tone] = pg.mkPen(color=color)
|
|
|
|
self.plot.setLabel('left', 'Signal Correlation')
|
|
self.plot.setLabel('bottom', 'Time (samples)')
|
|
self.plot.showGrid(x=True, y=True)
|
|
|
|
legend_view.setFixedWidth(80)
|
|
layout.ci.layout.setColumnFixedWidth(1, 80)
|
|
|
|
self.show()
|
|
|
|
def set_position(self, pos):
|
|
pass
|
|
|
|
def push_tone(self, tone, value):
|
|
self.tone_data[tone] = np.roll(self.tone_data[tone], -1)
|
|
self.tone_data[tone][-1] = value
|
|
self.tone_lines[tone].setData(self.tone_data[tone])
|
|
|
|
def push_tones(self, tone, values):
|
|
s = time.perf_counter()
|
|
#self.tone_data[tone] = np.append(self.tone_data[tone], values)
|
|
a = time.perf_counter()
|
|
|
|
self.tone_data[tone] = np.roll(self.tone_data[tone], -len(values))
|
|
self.tone_data[tone][-len(values):] = values
|
|
self.tone_lines[tone].setData(self.tone_data[tone])
|
|
#self.tone_lines[tone].setData(values)
|
|
g = time.perf_counter()
|
|
|
|
#print(a-s, g-a)
|
|
|
|
#self.plot.plot(values, pen=self.tone_pens[tone])
|
|
|
|
|
|
mkQApp("Correlation matrix display")
|
|
gui = DetectorGui()
|
|
|
|
def read_audio_from_stdin(chunk_size, process_chunk):
|
|
global keep_running
|
|
|
|
while keep_running:
|
|
# 2 bytes per sample for int16
|
|
read_size = chunk_size * 2
|
|
data = sys.stdin.buffer.read(read_size)
|
|
|
|
# Break the loop if no more data is available
|
|
if not data:
|
|
break
|
|
|
|
# Convert the binary data to a numpy array of int16
|
|
audio_chunk = np.frombuffer(data, dtype=np.int16)
|
|
process_chunk(audio_chunk)
|
|
|
|
sample_rate = 44100
|
|
note_length = 0.5
|
|
|
|
guard_duration = 0.01
|
|
|
|
def process_audio_chunk(audio_chunk):
|
|
global gui
|
|
|
|
data = audio_chunk
|
|
|
|
sample_rate = 44100
|
|
|
|
s = time.perf_counter()
|
|
data, sample_rate, decimation = anti_alias(data, sample_rate, 4800, 8)
|
|
data = data / (np.max(data) + 0.0001)
|
|
pure_signals = {tone:note(freq, note_length, rate=sample_rate) for tone,freq in TONES.items()}
|
|
aa = time.perf_counter()
|
|
|
|
correlations = {tone:smoothing_filter(np.abs(signal.correlate(data, pure, mode='same')), 64) for tone,pure in pure_signals.items()}
|
|
corr = time.perf_counter()
|
|
|
|
guard_samples = int(guard_duration * sample_rate)
|
|
for tone,massage in correlations.items():
|
|
#gui.push_tones(tone, massage)
|
|
gui.push_tone(tone, np.mean(massage))
|
|
gui.push_tones(tone, massage[guard_samples:-guard_samples:100])
|
|
|
|
draw = time.perf_counter()
|
|
|
|
#print(aa-s, corr-aa, draw-corr)
|
|
if draw - s > 0.1:
|
|
print("FAILED")
|
|
|
|
if __name__ == '__main__':
|
|
sig.signal(sig.SIGINT, signal_handler)
|
|
|
|
chunk_duration = 0.1 # seconds
|
|
sample_rate = 44100
|
|
channels = 1
|
|
chunk_size = int(sample_rate * chunk_duration) * channels
|
|
|
|
reader_thread = threading.Thread(target=read_audio_from_stdin, args=(chunk_size, process_audio_chunk))
|
|
reader_thread.daemon = True
|
|
reader_thread.start()
|
|
|
|
|
|
pg.exec()
|
|
|
|
# Wait...
|
|
reader_thread.join()
|