#! /usr/bin/env python3 import sys import threading import numpy as np import signal as sig from tones import TONES from filters import anti_alias, bandpass_filter, note, smoothing_filter from scipy import signal from scipy.signal import butter, lfilter, decimate import pyqtgraph as pg from pyqtgraph.Qt import QtGui, QtWidgets, mkQApp import time keep_running = True def signal_handler(sig, frame): global keep_running print('SIGINT received. Stopping...') keep_running = False class DetectorGui(QtWidgets.QMainWindow): def __init__(self, *args, **kwargs): super(DetectorGui, self).__init__(*args, **kwargs) layout = pg.GraphicsLayoutWidget(show=True) self.setCentralWidget(layout) self.setWindowTitle('SELCAL Detector') self.resize(1280, 800) self.plot = layout.addPlot() legend_view = layout.addViewBox() legend = pg.LegendItem(offset=(0, 0)) legend.setParentItem(legend_view) color_map = pg.colormap.get('CET-C6s') colors = color_map.getLookupTable(nPts=len(TONES)) t = np.linspace(0, 500, 100) self.tone_data = {} self.tone_lines = {} self.tone_pens = {} for tone,color in zip(TONES.keys(), colors): self.tone_data[tone] = np.zeros(1000, dtype=np.float64) #np.array([], dtype=np.float64) #np.zeros(int(10000), dtype=np.float64) self.tone_lines[tone] = self.plot.plot(self.tone_data[tone], pen=pg.mkPen(color=color), name=tone) legend.addItem(self.tone_lines[tone], tone) self.tone_pens[tone] = pg.mkPen(color=color) self.plot.setLabel('left', 'Signal Correlation') self.plot.setLabel('bottom', 'Time (samples)') self.plot.showGrid(x=True, y=True) legend_view.setFixedWidth(80) layout.ci.layout.setColumnFixedWidth(1, 80) self.show() def set_position(self, pos): pass def push_tone(self, tone, value): self.tone_data[tone] = np.roll(self.tone_data[tone], -1) self.tone_data[tone][-1] = value self.tone_lines[tone].setData(self.tone_data[tone]) def push_tones(self, tone, values): s = time.perf_counter() #self.tone_data[tone] = np.append(self.tone_data[tone], values) a = time.perf_counter() self.tone_data[tone] = np.roll(self.tone_data[tone], -len(values)) self.tone_data[tone][-len(values):] = values self.tone_lines[tone].setData(self.tone_data[tone]) #self.tone_lines[tone].setData(values) g = time.perf_counter() #print(a-s, g-a) #self.plot.plot(values, pen=self.tone_pens[tone]) mkQApp("Correlation matrix display") gui = DetectorGui() def read_audio_from_stdin(chunk_size, process_chunk): global keep_running while keep_running: # 2 bytes per sample for int16 read_size = chunk_size * 2 data = sys.stdin.buffer.read(read_size) # Break the loop if no more data is available if not data: break # Convert the binary data to a numpy array of int16 audio_chunk = np.frombuffer(data, dtype=np.int16) process_chunk(audio_chunk) sample_rate = 44100 note_length = 0.5 guard_duration = 0.01 def process_audio_chunk(audio_chunk): global gui data = audio_chunk sample_rate = 44100 s = time.perf_counter() data, sample_rate, decimation = anti_alias(data, sample_rate, 4800, 8) data = data / (np.max(data) + 0.0001) pure_signals = {tone:note(freq, note_length, rate=sample_rate) for tone,freq in TONES.items()} aa = time.perf_counter() correlations = {tone:smoothing_filter(np.abs(signal.correlate(data, pure, mode='same')), 64) for tone,pure in pure_signals.items()} corr = time.perf_counter() guard_samples = int(guard_duration * sample_rate) for tone,massage in correlations.items(): #gui.push_tones(tone, massage) gui.push_tone(tone, np.mean(massage)) gui.push_tones(tone, massage[guard_samples:-guard_samples:100]) draw = time.perf_counter() #print(aa-s, corr-aa, draw-corr) if draw - s > 0.1: print("FAILED") if __name__ == '__main__': sig.signal(sig.SIGINT, signal_handler) chunk_duration = 0.1 # seconds sample_rate = 44100 channels = 1 chunk_size = int(sample_rate * chunk_duration) * channels reader_thread = threading.Thread(target=read_audio_from_stdin, args=(chunk_size, process_audio_chunk)) reader_thread.daemon = True reader_thread.start() pg.exec() # Wait... reader_thread.join()