#! /usr/bin/env python3 import sys import threading import numpy as np import signal as sig from tones import TONES from filters import anti_alias, bandpass_filter, note from scipy import signal from scipy.signal import butter, lfilter, decimate import pyqtgraph as pg from pyqtgraph.Qt import QtGui, QtWidgets, mkQApp keep_running = True def signal_handler(sig, frame): global keep_running print('SIGINT received. Stopping...') keep_running = False class DetectorGui(QtWidgets.QMainWindow): def __init__(self, *args, **kwargs): super(DetectorGui, self).__init__(*args, **kwargs) layout = pg.GraphicsLayoutWidget(show=True) self.setCentralWidget(layout) self.setWindowTitle('SELCAL Detector') self.resize(1280, 800) self.plot = layout.addPlot() legend_view = layout.addViewBox() legend = pg.LegendItem(offset=(0, 0)) legend.setParentItem(legend_view) color_map = pg.colormap.get('CET-C6s') colors = color_map.getLookupTable(nPts=len(TONES)) t = np.linspace(0, 500, 100) self.tone_data = {} self.tone_lines = {} for tone,color in zip(TONES.keys(), colors): self.tone_data[tone] = np.zeros(int(10000), dtype=np.float64) #np.array([], dtype=np.float64) self.tone_lines[tone] = self.plot.plot(self.tone_data[tone], pen=pg.mkPen(color=color), name=tone) legend.addItem(self.tone_lines[tone], tone) self.plot.setLabel('left', 'Signal Correlation') self.plot.setLabel('bottom', 'Time (samples)') self.plot.showGrid(x=True, y=True) legend_view.setFixedWidth(80) layout.ci.layout.setColumnFixedWidth(1, 80) self.show() def set_position(self, pos): pass def push_tone(self, tone, value): self.tone_data[tone] = np.roll(self.tone_data[tone], -1) self.tone_data[tone][-1] = value self.tone_lines[tone].setData(self.tone_data[tone]) def push_tones(self, tone, values): #self.tone_data[tone] = np.append(self.tone_data[tone], values) self.tone_data[tone] = np.roll(self.tone_data[tone], -len(values)) self.tone_data[tone][-len(values):] = values self.tone_lines[tone].setData(self.tone_data[tone]) mkQApp("Correlation matrix display") gui = DetectorGui() def read_audio_from_stdin(chunk_size, process_chunk): global keep_running while keep_running: # 2 bytes per sample for int16 read_size = chunk_size * 2 data = sys.stdin.buffer.read(read_size) # Break the loop if no more data is available if not data: break # Convert the binary data to a numpy array of int16 audio_chunk = np.frombuffer(data, dtype=np.int16) process_chunk(audio_chunk) sample_rate = 44100 note_length = 0.1 N = 256 cumsum_convolution = np.ones(N)/N def process_audio_chunk(audio_chunk): global gui data = audio_chunk sample_rate = 44100 data, sample_rate, decimation = anti_alias(data, sample_rate, 4800) pure_signals = {tone:note(freq, note_length, rate=sample_rate) for tone,freq in TONES.items()} correlations = {tone:np.abs(signal.correlate(data, pure, mode='same')) for tone,pure in pure_signals.items()} massaged = {tone:decimate(np.convolve(correlation, cumsum_convolution, mode='valid'), 4) for tone,correlation in correlations.items()} print('processing done') for tone,massage in massaged.items(): gui.push_tones(tone, massage ) if __name__ == '__main__': sig.signal(sig.SIGINT, signal_handler) chunk_duration = 0.1 # seconds sample_rate = 44100 channels = 1 chunk_size = int(sample_rate * chunk_duration) * channels reader_thread = threading.Thread(target=read_audio_from_stdin, args=(chunk_size, process_audio_chunk)) reader_thread.daemon = True reader_thread.start() pg.exec() # Wait... reader_thread.join()