#! /usr/bin/env python3 import sys import threading import numpy as np import signal import pyqtgraph as pg data1 = np.random.normal(size=300) ptr1 = 0 win = pg.GraphicsLayoutWidget(show=True) win.setWindowTitle('pyqtgraph example: Scrolling Plots') plot = win.addPlot() curve = plot.plot(data1) keep_running = True def signal_handler(sig, frame): global keep_running print('SIGINT received. Stopping...') keep_running = False def read_audio_from_stdin(chunk_size, process_chunk): global keep_running while keep_running: # 2 bytes per sample for int16 read_size = chunk_size * 2 data = sys.stdin.buffer.read(read_size) # Break the loop if no more data is available if not data: break # Convert the binary data to a numpy array of int16 audio_chunk = np.frombuffer(data, dtype=np.int16) process_chunk(audio_chunk) def process_audio_chunk(audio_chunk): # Example processing: simply print the chunk global data1, ptr1, curve print(f"Read chunk: {len(audio_chunk)}") data1[:-1] = data1[1:] # shift data in the array one sample left # (see also: np.roll) data1[-1] = len(audio_chunk) ptr1 += 1 curve.setData(data1) curve.setPos(ptr1, 0) if __name__ == '__main__': signal.signal(signal.SIGINT, signal_handler) chunk_duration = 0.1 # seconds sample_rate = 44100 channels = 2 chunk_size = int(sample_rate * chunk_duration) * channels reader_thread = threading.Thread(target=read_audio_from_stdin, args=(chunk_size, process_audio_chunk)) reader_thread.daemon = True reader_thread.start() pg.exec() # Wait... reader_thread.join() ''' # 1) Simplest approach -- update data in the array such that plot appears to scroll # In these examples, the array size is fixed. p1 = win.addPlot() p2 = win.addPlot() data1 = np.random.normal(size=300) curve1 = p1.plot(data1) curve2 = p2.plot(data1) ptr1 = 0 def update1(): global data1, ptr1 data1[:-1] = data1[1:] # shift data in the array one sample left # (see also: np.roll) data1[-1] = np.random.normal() curve1.setData(data1) ptr1 += 1 curve2.setData(data1) curve2.setPos(ptr1, 0) '''