from threading import Thread import SoapySDR as soapy import prefixed from formats import * import sys import subprocess import struct from soapyhelpers import * from samplerates import * class Radio: FORMAT = 'CS16' SAMPLES = 8192 PORT = 8554 def __init__(self, name, device_info): self.name = name self.device_info = device_info self.run = False self.thread = None self.device = soapy.Device(device_info) if self.device is None: raise RuntimeError("Failed to connect to radio device") self.capabilities = self._get_capabilities() def configure(self, frequency): if self.is_streaming(): raise RuntimeError("Cannot configure radio while a stream is active") frequency = int(prefixed.Float(frequency)) bandwidth = 200000 sample_rates = preferred_sample_rates(self.capabilities['rx']['sample-rates']) if len(sample_rates) == 0: raise RuntimeError("No suitable sample rates are available") self.sample_rate, self.output_rate = sample_rates[0] self.device.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency) self.device.setSampleRate(soapy.SOAPY_SDR_RX, 0, self.sample_rate) self.device.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth) # Set automatic gain self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) return { 'frequency': self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), 'sample-rate': self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), 'bandwidth': self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), 'gain-mode': 'auto' if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else 'manual', } def get_info(self): return { 'name': self.name, 'device': self.device_info, 'capabilities': self.capabilities, 'stream-path': self._stream_path(), 'streaming': self.is_streaming(), } def _get_capabilities(self): def get_direction_capabilities(direction): return { 'antennas': self.device.listAntennas(direction, 0), 'gains': self.device.listGains(direction, 0), 'frequencies': self.device.listFrequencies(direction, 0), 'sample-rates': self.device.listSampleRates(direction, 0), 'bandwidths': self.device.listBandwidths(direction, 0), 'sensors': self.device.listSensors(direction, 0), 'formats': self.device.getStreamFormats(direction, 0), } return { 'rx': get_direction_capabilities(soapy.SOAPY_SDR_RX), 'tx': get_direction_capabilities(soapy.SOAPY_SDR_TX), 'clock-sources': self.device.listClockSources(), 'time-sources': self.device.listTimeSources(), 'register-interfaces': self.device.listRegisterInterfaces(), 'gpios': self.device.listGPIOBanks(), 'uarts': self.device.listUARTs(), } def is_streaming(self): return True if (self.thread and self.thread.is_alive()) else False def start_stream(self): if self.is_streaming(): raise RuntimeError('Stream thread is already running') self.run = True self.thread = Thread(target=self._stream_thread, daemon=True, args=()) self.thread.start() def end_stream(self): if self.thread is None: raise RuntimeError('No stream thread to terminate') self.run = False self.thread.join() self.thread = None def _stream_thread(self): self._init_stream() def is_alive(subprocess): return (subprocess.poll() is None) while self.run: # Check that the child processes are still running if (not is_alive(self.demod)) or (not is_alive(self.playback)): print('DSP chain failed, aborting stream.', file=sys.stderr) break result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) if result == 0: continue elif result.ret < 0: error = SoapyError(result.ret) if error is not SoapyError.Timeout: print("Stream read failed, aborting stream:", error, file=sys.stderr) break continue else: read_size = int(result.ret * 2) self.demod.stdin.write( struct.pack(FORMATS[Radio.FORMAT].packing % read_size, *self.buffer[:read_size]) ) self._cleanup_stream() def _init_stream(self): self.playback = subprocess.Popen( [ '/usr/bin/ffmpeg', '-f', 's16le', '-ar', str(self.output_rate), '-ac', '2', '-i', '-', '-f', 'rtsp', f"rtsp://localhost:{self._stream_path()}" ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self.demod = subprocess.Popen( ['/usr/bin/python3', 'fm_demod.py', '-f', 'CS16', '-s', str(self.sample_rate), '-d', str(self.output_rate)], stdin=subprocess.PIPE, stdout=self.playback.stdin, stderr=subprocess.DEVNULL ) self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy) self.stream = self.device.setupStream(soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy) result = self.device.activateStream(self.stream) if result != 0: raise RuntimeError(f"Error activating stream: {result}") def _cleanup_stream(self): self.run = False # Cleanup the streaming objects in the reverse order to the _init method. self.device.deactivateStream(self.stream) self.device.closeStream(self.stream) self.buffer = None # .terminate()/.kill() followed by .wait() is required to properly clear # killed zombie processes from the process table. # https://stackoverflow.com/a/41961462 self.demod.kill() self.demod.wait() self.demod = None self.playback.kill() self.playback.wait() self.playback = None def _stream_path(self): return f"{Radio.PORT}/radio/{self.name}" """ Quick and dirty test of the Radio class. """ if __name__ == '__main__': import time sdr = Radio('demo', {'driver': 'sdrplay'}) print('Configuring...') sdr.configure('105.5M') print('Configured.') print('Starting stream...') sdr.start_stream() print('Stream started.') # Let the stream play for a while time.sleep(15) print('Ending stream...') sdr.end_stream() print('Stream ended.')