from threading import Thread import SoapySDR as soapy import prefixed from formats import * import sys import subprocess import struct from soapyhelpers import * from samplerates import * from streamer import Streamer, is_alive class Radio(Streamer): REST_PATH = "radio" FORMAT = "CS16" SAMPLES = 8192 def __init__(self, name, device_info): super().__init__() self.name = name self.device_info = device_info self.device = soapy.Device(device_info) if self.device is None: raise RuntimeError("Failed to connect to radio device") self.capabilities = self._get_capabilities() def configure(self, frequency): if self.is_streaming(): raise RuntimeError("Cannot configure radio while a stream is active") frequency = int(prefixed.Float(frequency)) bandwidth = 200000 sample_rates = preferred_sample_rates(self.capabilities["rx"]["sample-rates"]) if len(sample_rates) == 0: raise RuntimeError("No suitable sample rates are available") self.sample_rate, self.output_rate = sample_rates[0] self.device.setFrequency(soapy.SOAPY_SDR_RX, 0, frequency) self.device.setSampleRate(soapy.SOAPY_SDR_RX, 0, self.sample_rate) self.device.setBandwidth(soapy.SOAPY_SDR_RX, 0, bandwidth) # Set automatic gain self.device.setGainMode(soapy.SOAPY_SDR_RX, 0, True) return { "frequency": self.device.getFrequency(soapy.SOAPY_SDR_RX, 0), "sample-rate": self.device.getSampleRate(soapy.SOAPY_SDR_RX, 0), "bandwidth": self.device.getBandwidth(soapy.SOAPY_SDR_RX, 0), "gain-mode": "auto" if self.device.getGainMode(soapy.SOAPY_SDR_RX, 0) else "manual", } def get_info(self): return { "name": self.name, "device": self.device_info, "capabilities": self.capabilities, "stream-path": self._stream_path(), "streaming": self.is_streaming(), } def _get_capabilities(self): def get_direction_capabilities(direction): return { "antennas": self.device.listAntennas(direction, 0), "gains": self.device.listGains(direction, 0), "frequencies": self.device.listFrequencies(direction, 0), "sample-rates": self.device.listSampleRates(direction, 0), "bandwidths": self.device.listBandwidths(direction, 0), "sensors": self.device.listSensors(direction, 0), "formats": self.device.getStreamFormats(direction, 0), } return { "rx": get_direction_capabilities(soapy.SOAPY_SDR_RX), "tx": get_direction_capabilities(soapy.SOAPY_SDR_TX), "clock-sources": self.device.listClockSources(), "time-sources": self.device.listTimeSources(), "register-interfaces": self.device.listRegisterInterfaces(), "gpios": self.device.listGPIOBanks(), "uarts": self.device.listUARTs(), } def _stream_thread(self): self._init_stream() while self.run: # Check that the child processes are still running if (not is_alive(self.demod)) or (not is_alive(self.playback)): print("DSP chain failed, aborting stream.", file=sys.stderr) break result = self.device.readStream(self.stream, [self.buffer], Radio.SAMPLES) if result == 0: continue elif result.ret < 0: error = SoapyError(result.ret) if error is not SoapyError.Timeout: print( "Stream read failed, aborting stream:", error, file=sys.stderr ) break continue else: read_size = int(result.ret * 2) self.demod.stdin.write( struct.pack( FORMATS[Radio.FORMAT].packing % read_size, *self.buffer[:read_size], ) ) self._cleanup_stream() def _init_stream(self): self.playback = subprocess.Popen( [ "/usr/bin/ffmpeg", "-f", "s16le", "-ar", str(self.output_rate), "-ac", "2", "-i", "-", "-f", "rtsp", self.stream_address(), ], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) self.demod = subprocess.Popen( [ "/usr/bin/python3", "fm_demod.py", "-f", "CS16", "-s", str(self.sample_rate), "-d", str(self.output_rate), ], stdin=subprocess.PIPE, stdout=self.playback.stdin, stderr=subprocess.DEVNULL, ) self.buffer = np.array([0] * Radio.SAMPLES * 2, FORMATS[Radio.FORMAT].numpy) self.stream = self.device.setupStream( soapy.SOAPY_SDR_RX, FORMATS[Radio.FORMAT].soapy ) result = self.device.activateStream(self.stream) if result != 0: raise RuntimeError(f"Error activating stream: {result}") def _cleanup_stream(self): self.run = False # Cleanup the streaming objects in the reverse order to the _init method. self.device.deactivateStream(self.stream) self.device.closeStream(self.stream) self.buffer = None # .terminate()/.kill() followed by .wait() is required to properly clear # killed zombie processes from the process table. # https://stackoverflow.com/a/41961462 self.demod.kill() self.demod.wait() self.demod = None self.playback.kill() self.playback.wait() self.playback = None """ Quick and dirty test of the Radio class. """ if __name__ == "__main__": import time sdr = Radio("demo", {"driver": "sdrplay"}) print("Configuring...") sdr.configure("105.5M") print("Configured.") print("Starting stream...") sdr.start_stream() print("Stream started.") # Let the stream play for a while time.sleep(15) print("Ending stream...") sdr.end_stream() print("Stream ended.")