from threading import Thread import SoapySDR as soapy import prefixed from formats import * import sys class Radio: FORMAT = 'CS16' SAMPLES = 8192 def __init__(self, device_info): self.device_info = device_info self.stream = None self.buffer = None self.port = None self.run = False self.thread = None self.device = soapy.Device(device_info) if self.device is None: raise RuntimeError("Failed to connect to radio device") def configure(self, frequency): frequency = int(prefixed.Float(frequency)) sample_rate = 384000 bandwidth = 200000 self.device.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency) self.device.setSampleRate(soapy.SOAPY_SDR_RX, 0, sample_rate) self.device.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth) # Set automatic gain self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) return { 'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), 'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), 'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), 'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual', } def start_stream(self): if self.thread: raise RuntimeError('Stream thread is already running') self.buffer = np.array([0] * Radio.SAMPLES * 2, TYPES[Radio.FORMAT]) self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT]) self.device.activateStream(self.stream) self.run = True self.thread = Thread(target=self._stream_thread, daemon=True, args=()) self.thread.start() def _stream_thread(self): while self.run: result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) if result.ret < 1: print('Stream read failed, exiting.', file=sys.stderr) self.keep_going = False self._cleanup_stream() def end_stream(self): if self.thread is None: raise RuntimeError('No stream thread to terminate') self.run = False self.thread.join() self.thread = None def _cleanup_stream(self): #shutdown the stream self.device.deactivateStream(self.stream) self.device.closeStream(self.stream)