selcal/scripts/live.py

154 lines
4.5 KiB
Python
Executable File

#! /usr/bin/env python3
import sys
import threading
import numpy as np
import signal as sig
from tones import TONES
from filters import anti_alias, bandpass_filter, note, smoothing_filter
from scipy import signal
from scipy.signal import butter, lfilter, decimate
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui, QtWidgets, mkQApp
import time
keep_running = True
def signal_handler(sig, frame):
global keep_running
print('SIGINT received. Stopping...')
keep_running = False
class DetectorGui(QtWidgets.QMainWindow):
def __init__(self, *args, **kwargs):
super(DetectorGui, self).__init__(*args, **kwargs)
layout = pg.GraphicsLayoutWidget(show=True)
self.setCentralWidget(layout)
self.setWindowTitle('SELCAL Detector')
self.resize(1280, 800)
self.plot = layout.addPlot()
legend_view = layout.addViewBox()
legend = pg.LegendItem(offset=(0, 0))
legend.setParentItem(legend_view)
color_map = pg.colormap.get('CET-C6s')
colors = color_map.getLookupTable(nPts=len(TONES))
t = np.linspace(0, 500, 100)
self.tone_data = {}
self.tone_lines = {}
self.tone_pens = {}
for tone,color in zip(TONES.keys(), colors):
self.tone_data[tone] = np.zeros(1000, dtype=np.float64) #np.array([], dtype=np.float64) #np.zeros(int(10000), dtype=np.float64)
self.tone_lines[tone] = self.plot.plot(self.tone_data[tone], pen=pg.mkPen(color=color), name=tone)
legend.addItem(self.tone_lines[tone], tone)
self.tone_pens[tone] = pg.mkPen(color=color)
self.plot.setLabel('left', 'Signal Correlation')
self.plot.setLabel('bottom', 'Time (samples)')
self.plot.showGrid(x=True, y=True)
legend_view.setFixedWidth(80)
layout.ci.layout.setColumnFixedWidth(1, 80)
self.show()
def set_position(self, pos):
pass
def push_tone(self, tone, value):
self.tone_data[tone] = np.roll(self.tone_data[tone], -1)
self.tone_data[tone][-1] = value
self.tone_lines[tone].setData(self.tone_data[tone])
def push_tones(self, tone, values):
s = time.perf_counter()
#self.tone_data[tone] = np.append(self.tone_data[tone], values)
a = time.perf_counter()
self.tone_data[tone] = np.roll(self.tone_data[tone], -len(values))
self.tone_data[tone][-len(values):] = values
self.tone_lines[tone].setData(self.tone_data[tone])
#self.tone_lines[tone].setData(values)
g = time.perf_counter()
#print(a-s, g-a)
#self.plot.plot(values, pen=self.tone_pens[tone])
mkQApp("Correlation matrix display")
gui = DetectorGui()
def read_audio_from_stdin(chunk_size, process_chunk):
global keep_running
while keep_running:
# 2 bytes per sample for int16
read_size = chunk_size * 2
data = sys.stdin.buffer.read(read_size)
# Break the loop if no more data is available
if not data:
break
# Convert the binary data to a numpy array of int16
audio_chunk = np.frombuffer(data, dtype=np.int16)
process_chunk(audio_chunk)
sample_rate = 44100
note_length = 0.5
guard_duration = 0.01
def process_audio_chunk(audio_chunk):
global gui
data = audio_chunk
sample_rate = 44100
s = time.perf_counter()
data, sample_rate, decimation = anti_alias(data, sample_rate, 4800, 8)
data = data / (np.max(data) + 0.0001)
pure_signals = {tone:note(freq, note_length, rate=sample_rate) for tone,freq in TONES.items()}
aa = time.perf_counter()
correlations = {tone:smoothing_filter(np.abs(signal.correlate(data, pure, mode='same')), 64) for tone,pure in pure_signals.items()}
corr = time.perf_counter()
guard_samples = int(guard_duration * sample_rate)
for tone,massage in correlations.items():
#gui.push_tones(tone, massage)
gui.push_tone(tone, np.mean(massage))
gui.push_tones(tone, massage[guard_samples:-guard_samples:100])
draw = time.perf_counter()
#print(aa-s, corr-aa, draw-corr)
if draw - s > 0.1:
print("FAILED")
if __name__ == '__main__':
sig.signal(sig.SIGINT, signal_handler)
chunk_duration = 0.1 # seconds
sample_rate = 44100
channels = 1
chunk_size = int(sample_rate * chunk_duration) * channels
reader_thread = threading.Thread(target=read_audio_from_stdin, args=(chunk_size, process_audio_chunk))
reader_thread.daemon = True
reader_thread.start()
pg.exec()
# Wait...
reader_thread.join()