from threading import Thread import SoapySDR as soapy import prefixed from formats import * import sys import os import subprocess import struct class Radio: FORMAT = 'CS16' SAMPLES = 8192 def __init__(self, device_info): self.device_info = device_info self.run = False self.thread = None self.device = soapy.Device(device_info) if self.device is None: raise RuntimeError("Failed to connect to radio device") def configure(self, frequency): frequency = int(prefixed.Float(frequency)) sample_rate = 384000 bandwidth = 200000 self.device.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency) self.device.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate) self.device.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth) # Set automatic gain self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) return { 'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), 'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), 'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), 'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual', } def start_stream(self): if self.thread: raise RuntimeError('Stream thread is already running') self.run = True self.thread = Thread(target=self._stream_thread, daemon=True, args=()) self.thread.start() def end_stream(self): if self.thread is None: raise RuntimeError('No stream thread to terminate') self.run = False self.thread.join() self.thread = None def _stream_thread(self): self._init_stream() while self.run: result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) if result.ret < 1: print('Stream read failed, exiting.', file=sys.stderr) self.keep_going = False read_size = int(result.ret * 2) self.demod.stdin.write( struct.pack(PACKINGS[Radio.FORMAT] % read_size, *self.buffer[:read_size]) ) self._cleanup_stream() def _init_stream(self): self.playback = subprocess.Popen( ['/usr/bin/sox', '-t', 'raw', '-r', '32000', '-b', '16', '-c', '2', '-L', '-e', 'signed-integer', '-', '-d'], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self.demod = subprocess.Popen( ['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', '384k', '-d', '32k'], stdin=subprocess.PIPE, stdout=self.playback.stdin, stderr=subprocess.DEVNULL ) self.buffer = np.array([0] * Radio.SAMPLES * 2, TYPES[Radio.FORMAT]) self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT]) self.device.activateStream(self.stream) def _cleanup_stream(self): # Cleanup the streaming objects in the reverse order to the _init method. self.device.deactivateStream(self.stream) self.device.closeStream(self.stream) self.buffer = None self.demod.kill() self.playback.kill() """ Quick and dirty test of the Radio class. """ if __name__ == '__main__': import time sdr = Radio({'driver': 'sdrplay'}) print('Configuring...') sdr.configure('105.5M') print('Configured.') print('Starting stream...') sdr.start_stream() print('Stream started.') # Let the stream play for a while time.sleep(5) print('Ending stream...') sdr.end_stream() print('Stream ended.')